← back to gyzhoe__assistant

Function bodies 138 total

All specs Real LLM only Function bodies
Settings class · python · L8-L60 (53 LOC)
backend/app/config.py
class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        extra="ignore",
    )

    ollama_base_url: str = "http://localhost:11434"
    chroma_path: str = "./chroma_data"
    cors_origin: str = "chrome-extension://placeholder"
    default_model: str = "llama3.2:3b"
    version: str = "1.4.0"
    prompt_template_path: str | None = None

    # Enterprise security
    # Set API_TOKEN in .env to a long random secret (e.g. `openssl rand -hex 32`).
    # The extension must send this in the X-Extension-Token header.
    # Leave empty ("") in dev to disable token auth.
    api_token: str = ""

    # Rate limiting: max /generate calls per client IP per minute
    rate_limit_per_minute: int = 20

    # Request body size limit in bytes (default 64 KB)
    max_request_bytes: int = 65_536

    # Max upload file size in bytes (default 50 MB)
    max_upload_bytes: int = 52_428_800

    @model_validator(mode="after")
    d
reject_wildcard_cors_with_token method · python · L38-L60 (23 LOC)
backend/app/config.py
    def reject_wildcard_cors_with_token(self) -> Self:
        """Reject CORS_ORIGIN=* when API_TOKEN is set.

        Combining a wildcard CORS origin with token-based authentication is a
        security misconfiguration: any website can issue credentialed requests
        against the backend, making the token the only line of defence.  In
        production (api_token non-empty) a specific extension origin must be
        provided instead.
        """
        if self.api_token and self.cors_origin == "*":
            raise ValueError(
                "CORS_ORIGIN='*' is not allowed when API_TOKEN is set. "
                "Set CORS_ORIGIN to your extension origin "
                "(e.g. chrome-extension://<ID>) to secure the backend."
            )
        if not self.api_token and self.cors_origin == "*":
            warnings.warn(
                "CORS_ORIGIN='*' is set with no API_TOKEN — acceptable for "
                "local development only. Never use this configuration in "
JSONFormatter class · python · L9-L21 (13 LOC)
backend/app/logging_config.py
class JSONFormatter(logging.Formatter):
    """JSON log formatter for structured logging."""

    def format(self, record: logging.LogRecord) -> str:
        log_entry: dict[str, object] = {
            "timestamp": datetime.now(UTC).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if record.exc_info and record.exc_info[0] is not None:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)
format method · python · L12-L21 (10 LOC)
backend/app/logging_config.py
    def format(self, record: logging.LogRecord) -> str:
        log_entry: dict[str, object] = {
            "timestamp": datetime.now(UTC).isoformat(),
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
        }
        if record.exc_info and record.exc_info[0] is not None:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)
setup_logging function · python · L24-L37 (14 LOC)
backend/app/logging_config.py
def setup_logging(level: str = "INFO") -> None:
    """Configure structured JSON logging for the application."""
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JSONFormatter())

    root = logging.getLogger()
    root.setLevel(getattr(logging, level.upper(), logging.INFO))
    root.handlers.clear()
    root.addHandler(handler)

    # Quiet noisy libraries
    logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
    logging.getLogger("chromadb").setLevel(logging.WARNING)
    logging.getLogger("httpx").setLevel(logging.WARNING)
lifespan function · python · L25-L45 (21 LOC)
backend/app/main.py
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
    """Initialize ChromaDB client and verify Ollama on startup."""
    logger.info("Starting AI Helpdesk Assistant backend v%s", settings.version)

    app.state.chroma_client = chromadb.PersistentClient(path=settings.chroma_path)
    logger.info("ChromaDB initialized at %s", settings.chroma_path)

    app.state.ollama_reachable = False
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=5.0)
            app.state.ollama_reachable = resp.status_code == 200
    except Exception:
        app.state.ollama_reachable = False

    if app.state.ollama_reachable:
        logger.info("Ollama reachable at %s", settings.ollama_base_url)
    else:
        logger.warning("Ollama not reachable at %s", settings.ollama_base_url)

    yield
create_app function · python · L48-L86 (39 LOC)
backend/app/main.py
def create_app() -> FastAPI:
    app = FastAPI(
        title="AI Helpdesk Assistant Backend",
        version=settings.version,
        lifespan=lifespan,
    )

    # Middleware is applied in reverse order (last added = outermost wrapper)
    # Execution order for a request: SecurityHeaders → CORS → SizeLimit → RateLimit → APIToken → router

    app.add_middleware(SecurityHeadersMiddleware)

    app.add_middleware(
        CORSMiddleware,
        allow_origins=[settings.cors_origin],
        allow_credentials=False,
        allow_methods=["GET", "POST"],
        allow_headers=["Content-Type", "X-Extension-Token"],
    )

    app.add_middleware(
        RequestSizeLimitMiddleware,
        max_bytes=settings.max_request_bytes,
        exempt_paths={"/ingest/upload"},
    )

    app.add_middleware(
        RateLimitMiddleware,
        max_per_minute=settings.rate_limit_per_minute,
    )

    app.add_middleware(APITokenMiddleware)

    app.include_router(health.router)
    app.include_ro
If a scraper extracted this row, it came from Repobility (https://repobility.com)
APITokenMiddleware class · python · L31-L64 (34 LOC)
backend/app/middleware/security.py
class APITokenMiddleware(BaseHTTPMiddleware):
    """
    Validates the X-Extension-Token header on all non-health requests.

    The extension sends a shared secret configured in both:
      - Backend: API_TOKEN env var (required in production)
      - Extension: stored in chrome.storage.local (never synced)

    /health is exempt so operators can monitor without the token.
    """

    EXEMPT_PATHS = {"/health", "/docs", "/openapi.json"}

    def __init__(self, app: ASGIApp) -> None:
        super().__init__(app)
        self._token = settings.api_token

    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        if request.url.path in self.EXEMPT_PATHS:
            return await call_next(request)

        # If no token is configured, skip auth (dev mode only)
        if not self._token:
            return await call_next(request)

        provided = request.headers.get("X-Extension-Token", "")
        if not provided or no
__init__ method · python · L44-L46 (3 LOC)
backend/app/middleware/security.py
    def __init__(self, app: ASGIApp) -> None:
        super().__init__(app)
        self._token = settings.api_token
dispatch method · python · L48-L64 (17 LOC)
backend/app/middleware/security.py
    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        if request.url.path in self.EXEMPT_PATHS:
            return await call_next(request)

        # If no token is configured, skip auth (dev mode only)
        if not self._token:
            return await call_next(request)

        provided = request.headers.get("X-Extension-Token", "")
        if not provided or not secrets.compare_digest(provided, self._token):
            logger.warning("Auth failure on %s from %s", request.url.path, request.client.host if request.client else "unknown")
            return JSONResponse(
                status_code=401,
                content={"detail": "Unauthorized. Missing or invalid X-Extension-Token header."},
            )

        return await call_next(request)
RateLimitMiddleware class · python · L74-L145 (72 LOC)
backend/app/middleware/security.py
class RateLimitMiddleware(BaseHTTPMiddleware):
    """
    Simple in-process rate limiter.
    Limits /generate and /ingest/upload to configurable requests per client IP.

    Memory management: a periodic sweep (at most once per window) evicts entries
    for IPs whose most-recent request is older than the window.  This prevents
    unbounded growth of ``_counts`` when the server receives traffic from many
    distinct source IPs over time.
    """

    RATE_LIMITED_PATHS = {"/generate", "/ingest/upload"}

    def __init__(self, app: ASGIApp, max_per_minute: int = 20) -> None:
        super().__init__(app)
        self._max = max_per_minute
        self._window = 60.0  # seconds
        # Keyed by "{path}:{ip}" to track per-path, per-IP rate limits
        self._counts: dict[str, list[float]] = defaultdict(list)
        self._lock = asyncio.Lock()
        self._last_sweep: float = 0.0  # monotonic timestamp of the most-recent cleanup sweep
        self._path_limits: dict[str, int] = {
__init__ method · python · L87-L98 (12 LOC)
backend/app/middleware/security.py
    def __init__(self, app: ASGIApp, max_per_minute: int = 20) -> None:
        super().__init__(app)
        self._max = max_per_minute
        self._window = 60.0  # seconds
        # Keyed by "{path}:{ip}" to track per-path, per-IP rate limits
        self._counts: dict[str, list[float]] = defaultdict(list)
        self._lock = asyncio.Lock()
        self._last_sweep: float = 0.0  # monotonic timestamp of the most-recent cleanup sweep
        self._path_limits: dict[str, int] = {
            "/generate": max_per_minute,
            "/ingest/upload": INGEST_RATE_LIMIT,
        }
_evict_stale_entries method · python · L100-L114 (15 LOC)
backend/app/middleware/security.py
    def _evict_stale_entries(self, now: float) -> None:
        """Remove IPs whose most-recent request falls outside the current window.

        Must be called while ``self._lock`` is held.  A full sweep is performed
        at most once per window period so that the amortised cost per request is
        negligible even under high load.
        """
        if now - self._last_sweep < self._window:
            return

        stale_ips = [ip for ip, ts in self._counts.items() if not ts or now - ts[-1] >= self._window]
        for ip in stale_ips:
            del self._counts[ip]

        self._last_sweep = now
dispatch method · python · L116-L145 (30 LOC)
backend/app/middleware/security.py
    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        if request.url.path not in self.RATE_LIMITED_PATHS:
            return await call_next(request)

        client_ip = request.client.host if request.client else "unknown"
        rate_key = f"{request.url.path}:{client_ip}"
        limit = self._path_limits.get(request.url.path, self._max)
        now = time.monotonic()

        async with self._lock:
            # Periodic cleanup: evict stale IP entries to bound memory usage.
            self._evict_stale_entries(now)

            timestamps = self._counts[rate_key]
            # Remove timestamps outside the window
            self._counts[rate_key] = [t for t in timestamps if now - t < self._window]

            if len(self._counts[rate_key]) >= limit:
                logger.warning("Rate limit exceeded for %s on %s", client_ip, request.url.path)
                return JSONResponse(
                    status_code
RequestSizeLimitMiddleware class · python · L152-L189 (38 LOC)
backend/app/middleware/security.py
class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
    """
    Rejects requests whose body exceeds max_bytes.
    Prevents oversized payloads from being forwarded to Ollama.
    Default: 64 KB — sufficient for the largest reasonable ticket description.
    """

    def __init__(
        self,
        app: ASGIApp,
        max_bytes: int = 65_536,
        exempt_paths: set[str] | None = None,
    ) -> None:
        super().__init__(app)
        self._max_bytes = max_bytes
        self._exempt_paths = exempt_paths or set()

    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        if request.url.path in self._exempt_paths:
            return await call_next(request)
        content_length = request.headers.get("content-length")
        if content_length:
            if int(content_length) > self._max_bytes:
                return self._too_large()
        elif request.method in {"POST", "PUT", "PATCH"}:
            body = 
Repobility · open methodology · https://repobility.com/research/
__init__ method · python · L159-L167 (9 LOC)
backend/app/middleware/security.py
    def __init__(
        self,
        app: ASGIApp,
        max_bytes: int = 65_536,
        exempt_paths: set[str] | None = None,
    ) -> None:
        super().__init__(app)
        self._max_bytes = max_bytes
        self._exempt_paths = exempt_paths or set()
dispatch method · python · L169-L180 (12 LOC)
backend/app/middleware/security.py
    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        if request.url.path in self._exempt_paths:
            return await call_next(request)
        content_length = request.headers.get("content-length")
        if content_length:
            if int(content_length) > self._max_bytes:
                return self._too_large()
        elif request.method in {"POST", "PUT", "PATCH"}:
            body = await request.body()
            if len(body) > self._max_bytes:
                return self._too_large()
        return await call_next(request)
_too_large method · python · L182-L189 (8 LOC)
backend/app/middleware/security.py
    def _too_large(self) -> JSONResponse:
        return JSONResponse(
            status_code=413,
            content={
                "detail": f"Request body too large. Max {self._max_bytes} bytes.",
                "error_code": "PAYLOAD_TOO_LARGE",
            },
        )
SecurityHeadersMiddleware class · python · L196-L208 (13 LOC)
backend/app/middleware/security.py
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """Adds defensive HTTP security headers to all responses."""

    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["Cache-Control"] = "no-store"
        response.headers["Referrer-Policy"] = "no-referrer"
        # Remove server version banner
        if "server" in response.headers:
            del response.headers["server"]
        return response
dispatch method · python · L199-L208 (10 LOC)
backend/app/middleware/security.py
    async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["Cache-Control"] = "no-store"
        response.headers["Referrer-Policy"] = "no-referrer"
        # Remove server version banner
        if "server" in response.headers:
            del response.headers["server"]
        return response
GenerateRequest class · python · L11-L26 (16 LOC)
backend/app/models/request_models.py
class GenerateRequest(BaseModel):
    ticket_subject: str = Field(
        default="", max_length=_SUBJECT_MAX, description="Ticket subject line"
    )
    ticket_description: str = Field(
        default="", max_length=_DESCRIPTION_MAX, description="Full problem description"
    )
    requester_name: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="Requester's name")
    category: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="WHD ticket category")
    status: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="WHD ticket status")
    model: str = Field(default="llama3.2:3b", max_length=_MODEL_MAX, description="Ollama model to use")
    max_context_docs: int = Field(default=5, ge=1, le=20)
    stream: bool = Field(default=False)
    prompt_suffix: str = Field(
        default="", max_length=2000, description="Custom instructions appended to the prompt"
    )
ContextDoc class · python · L4-L8 (5 LOC)
backend/app/models/response_models.py
class ContextDoc(BaseModel):
    content: str
    source: str  # "kb" | "ticket"
    score: float
    metadata: dict[str, object]
GenerateResponse class · python · L11-L15 (5 LOC)
backend/app/models/response_models.py
class GenerateResponse(BaseModel):
    reply: str
    model_used: str
    context_docs: list[ContextDoc]
    latency_ms: int
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
IngestUploadResponse class · python · L18-L23 (6 LOC)
backend/app/models/response_models.py
class IngestUploadResponse(BaseModel):
    filename: str
    collection: str
    chunks_ingested: int
    processing_time_ms: int
    warning: str | None = None  # e.g., "No text content extracted"
generate_reply function · python · L17-L72 (56 LOC)
backend/app/routers/generate.py
async def generate_reply(body: GenerateRequest, request: Request) -> GenerateResponse:
    """Retrieve RAG context and generate a helpdesk reply via Ollama."""
    chroma_client = request.app.state.chroma_client
    rag = RAGService(chroma_client=chroma_client)
    llm = LLMService()

    logger.info("Generate request: model=%s subject=%s", body.model, body.ticket_subject[:80])
    start = time.monotonic()

    # Retrieve context
    query = f"{body.ticket_subject}\n\n{body.ticket_description}".strip()
    try:
        context_docs = await rag.retrieve(
            query=query or "general helpdesk inquiry",
            max_docs=body.max_context_docs,
        )
    except ConnectionError as exc:
        raise HTTPException(
            status_code=503,
            detail={
                "detail": str(exc),
                "error_code": "OLLAMA_DOWN",
            },
        ) from exc

    # Build prompt
    context_text = "\n\n---\n\n".join(
        f"[{doc.source.upper()} | score: {d
_build_prompt function · python · L75-L94 (20 LOC)
backend/app/routers/generate.py
def _build_prompt(body: GenerateRequest, context_text: str) -> str:
    subject = body.ticket_subject or "(not available)"
    description = body.ticket_description or "(not available)"
    return f"""You are a helpful IT helpdesk assistant. Answer the technician's query based on the ticket context and retrieved knowledge below.

TICKET
Subject: {subject}
Requester: {body.requester_name} | Category: {body.category} | Status: {body.status}
Description: {description}

RELEVANT CONTEXT
{context_text if context_text else "No relevant context found in knowledge base."}

INSTRUCTIONS
- Write a professional, empathetic reply to the requester
- Reference specific steps from the knowledge base when available
- Keep the reply concise (under 200 words unless complexity demands more)
- Do not hallucinate software versions or ticket numbers

REPLY:"""
health_check function · python · L16-L43 (28 LOC)
backend/app/routers/health.py
async def health_check(request: Request) -> dict[str, object]:
    """Return system health status including Ollama and ChromaDB state."""
    ollama_reachable = False
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=5.0)
            ollama_reachable = resp.status_code == 200
    except Exception:
        ollama_reachable = False

    chroma_ready = False
    chroma_doc_counts: dict[str, int] = {}
    try:
        chroma_client = request.app.state.chroma_client
        collections = chroma_client.list_collections()
        chroma_ready = True
        for col in collections:
            chroma_doc_counts[col.name] = col.count()
    except Exception:
        chroma_ready = False

    return {
        "status": "ok" if ollama_reachable and chroma_ready else "degraded",
        "ollama_reachable": ollama_reachable,
        "chroma_ready": chroma_ready,
        "chroma_doc_counts": chroma_doc_counts
shutdown_backend function · python · L47-L55 (9 LOC)
backend/app/routers/health.py
async def shutdown_backend() -> dict[str, str]:
    """Gracefully shut down the backend server after a short delay."""

    async def _delayed_kill() -> None:
        await asyncio.sleep(0.5)
        os.kill(os.getpid(), signal.SIGTERM)

    asyncio.create_task(_delayed_kill())
    return {"status": "shutting_down"}
start_ollama function · python · L59-L83 (25 LOC)
backend/app/routers/health.py
async def start_ollama() -> dict[str, str]:
    """Start the Ollama server as a detached background process."""
    # Check if already running
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=3.0)
            if resp.status_code == 200:
                return {"status": "already_running"}
    except Exception:
        pass

    if sys.platform == "win32":
        subprocess.Popen(
            ["ollama", "serve"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            creationflags=subprocess.CREATE_NO_WINDOW,
        )
    else:
        subprocess.Popen(
            ["ollama", "serve"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
        )
    return {"status": "starting"}
stop_ollama function · python · L87-L104 (18 LOC)
backend/app/routers/health.py
async def stop_ollama() -> dict[str, str]:
    """Stop the Ollama server process."""
    if sys.platform == "win32":
        subprocess.run(
            ["taskkill", "/IM", "ollama.exe", "/F"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )
        subprocess.run(
            ["taskkill", "/IM", "ollama_llama_server.exe", "/F"],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            check=False,
        )
    else:
        subprocess.run(["pkill", "-f", "ollama"], check=False)
    return {"status": "stopping"}
upload_file function · python · L34-L136 (103 LOC)
backend/app/routers/ingest.py
async def upload_file(request: Request, file: UploadFile) -> IngestUploadResponse:
    """Upload a single file for ingestion into ChromaDB."""
    # Validate filename
    if not file.filename:
        return JSONResponse(  # type: ignore[return-value]
            status_code=422,
            content={"detail": "No filename provided."},
        )

    # Sanitize filename (strip directory components)
    safe_name = PurePosixPath(file.filename).name
    suffix = Path(safe_name).suffix.lower()

    if suffix not in ALLOWED_EXTENSIONS:
        return JSONResponse(  # type: ignore[return-value]
            status_code=422,
            content={
                "detail": (
                    f"Unsupported file type: {suffix}. "
                    f"Allowed: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
                ),
            },
        )

    # Try to acquire the upload semaphore (non-blocking)
    if not _upload_semaphore._value:  # noqa: SLF001
        return JSONResponse(  # type: ig
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
clear_collection function · python · L140-L161 (22 LOC)
backend/app/routers/ingest.py
async def clear_collection(request: Request, name: str) -> dict[str, str]:
    """Delete all documents from a collection."""
    if name not in ALLOWED_COLLECTIONS:
        return JSONResponse(  # type: ignore[return-value]
            status_code=404,
            content={
                "detail": (
                    f"Unknown collection: {name}. "
                    f"Allowed: {', '.join(sorted(ALLOWED_COLLECTIONS))}"
                ),
            },
        )

    chroma_client = request.app.state.chroma_client

    try:
        await asyncio.to_thread(chroma_client.delete_collection, name)
    except ValueError:
        # Collection doesn't exist — that's fine, idempotent
        pass

    return {"status": "ok", "collection": name}
_stream_to_temp function · python · L164-L195 (32 LOC)
backend/app/routers/ingest.py
async def _stream_to_temp(
    file: UploadFile, safe_name: str, suffix: str,
) -> Path:
    """Stream uploaded file to a temp file, enforcing size limit."""
    max_bytes = settings.max_upload_bytes
    written = 0

    tmp = tempfile.NamedTemporaryFile(
        delete=False, suffix=suffix, prefix=f"ingest_{safe_name}_",
    )
    tmp_path = Path(tmp.name)

    try:
        while True:
            chunk = await file.read(_CHUNK_SIZE)
            if not chunk:
                break
            written += len(chunk)
            if written > max_bytes:
                tmp.close()
                _cleanup_temp(tmp_path)
                raise _PayloadTooLargeError(max_bytes)
            tmp.write(chunk)
        tmp.close()
    except _PayloadTooLargeError:
        raise
    except Exception:
        tmp.close()
        _cleanup_temp(tmp_path)
        raise

    return tmp_path
_PayloadTooLargeError class · python · L198-L203 (6 LOC)
backend/app/routers/ingest.py
class _PayloadTooLargeError(Exception):
    """Raised when streamed upload exceeds size limit."""

    def __init__(self, max_bytes: int) -> None:
        self.max_bytes = max_bytes
        super().__init__(f"File exceeds maximum upload size of {max_bytes} bytes.")
__init__ method · python · L201-L203 (3 LOC)
backend/app/routers/ingest.py
    def __init__(self, max_bytes: int) -> None:
        self.max_bytes = max_bytes
        super().__init__(f"File exceeds maximum upload size of {max_bytes} bytes.")
_cleanup_temp function · python · L206-L215 (10 LOC)
backend/app/routers/ingest.py
def _cleanup_temp(path: Path, retries: int = 3, delay: float = 0.5) -> None:
    """Remove temp file with retries for Windows AV locks."""
    for attempt in range(retries):
        try:
            path.unlink(missing_ok=True)
            return
        except OSError:
            if attempt < retries - 1:
                time.sleep(delay)
    logger.warning("Could not delete temp file: %s", path)
list_models function · python · L12-L25 (14 LOC)
backend/app/routers/models.py
async def list_models() -> dict[str, list[str]]:
    """Proxy Ollama's /api/tags to return available model names."""
    try:
        async with httpx.AsyncClient() as client:
            resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=10.0)
            resp.raise_for_status()
            data = resp.json()
            model_names = [m["name"] for m in data.get("models", [])]
            return {"models": model_names}
    except (httpx.HTTPError, json.JSONDecodeError, KeyError) as exc:
        raise HTTPException(
            status_code=503,
            detail={"detail": "Cannot reach Ollama to list models.", "error_code": "OLLAMA_DOWN"},
        ) from exc
EmbedService class · python · L15-L72 (58 LOC)
backend/app/services/embed_service.py
class EmbedService:
    """Generates embeddings using nomic-embed-text via Ollama."""

    def __init__(self, model: str = "nomic-embed-text") -> None:
        self.model = model
        self.base_url = settings.ollama_base_url

    async def embed(self, text: str) -> list[float]:
        """Return the embedding vector with retry logic."""
        last_error: ConnectionError | None = None
        for attempt in range(1 + MAX_RETRIES):
            try:
                result = await asyncio.to_thread(self._embed_sync, text)
                if attempt > 0:
                    logger.info("Ollama embed succeeded on attempt %d", attempt + 1)
                return result
            except ConnectionError as exc:
                last_error = exc
                if attempt < MAX_RETRIES:
                    logger.warning(
                        "Ollama embed attempt %d failed, retrying in %.1fs: %s",
                        attempt + 1, RETRY_DELAY, exc,
                    )
            
__init__ method · python · L18-L20 (3 LOC)
backend/app/services/embed_service.py
    def __init__(self, model: str = "nomic-embed-text") -> None:
        self.model = model
        self.base_url = settings.ollama_base_url
If a scraper extracted this row, it came from Repobility (https://repobility.com)
embed method · python · L22-L40 (19 LOC)
backend/app/services/embed_service.py
    async def embed(self, text: str) -> list[float]:
        """Return the embedding vector with retry logic."""
        last_error: ConnectionError | None = None
        for attempt in range(1 + MAX_RETRIES):
            try:
                result = await asyncio.to_thread(self._embed_sync, text)
                if attempt > 0:
                    logger.info("Ollama embed succeeded on attempt %d", attempt + 1)
                return result
            except ConnectionError as exc:
                last_error = exc
                if attempt < MAX_RETRIES:
                    logger.warning(
                        "Ollama embed attempt %d failed, retrying in %.1fs: %s",
                        attempt + 1, RETRY_DELAY, exc,
                    )
                    await asyncio.sleep(RETRY_DELAY)
        logger.error("Ollama embed failed after %d attempts", 1 + MAX_RETRIES)
        raise last_error  # type: ignore[misc]
_embed_sync method · python · L42-L72 (31 LOC)
backend/app/services/embed_service.py
    def _embed_sync(self, text: str) -> list[float]:
        try:
            with httpx.Client() as client:
                resp = client.post(
                    f"{self.base_url}/api/embeddings",
                    json={"model": self.model, "prompt": text},
                    timeout=30.0,
                )
                resp.raise_for_status()
                data = resp.json()
                if "embedding" not in data:
                    raise ConnectionError(
                        f"Ollama embed response missing 'embedding' key: {list(data.keys())}"
                    )
                return list(data["embedding"])
        except httpx.ConnectError as exc:
            raise ConnectionError(
                f"Ollama embed service unreachable at {self.base_url}"
            ) from exc
        except httpx.TimeoutException as exc:
            raise ConnectionError(
                f"Ollama embed request timed out at {self.base_url}"
            ) from exc
        except 
LLMService class · python · L15-L66 (52 LOC)
backend/app/services/llm_service.py
class LLMService:
    """Generates text completions via Ollama."""

    def __init__(self) -> None:
        self.base_url = settings.ollama_base_url

    async def generate(self, prompt: str, model: str) -> str:
        """Generate a completion with retry logic. Raises ConnectionError if Ollama is unreachable."""
        last_error: ConnectionError | None = None
        for attempt in range(1 + MAX_RETRIES):
            try:
                result = await asyncio.to_thread(self._generate_sync, prompt, model)
                if attempt > 0:
                    logger.info("Ollama generate succeeded on attempt %d", attempt + 1)
                return result
            except ConnectionError as exc:
                last_error = exc
                if attempt < MAX_RETRIES:
                    logger.warning(
                        "Ollama generate attempt %d failed, retrying in %.1fs: %s",
                        attempt + 1, RETRY_DELAY, exc,
                    )
                    a
generate method · python · L21-L39 (19 LOC)
backend/app/services/llm_service.py
    async def generate(self, prompt: str, model: str) -> str:
        """Generate a completion with retry logic. Raises ConnectionError if Ollama is unreachable."""
        last_error: ConnectionError | None = None
        for attempt in range(1 + MAX_RETRIES):
            try:
                result = await asyncio.to_thread(self._generate_sync, prompt, model)
                if attempt > 0:
                    logger.info("Ollama generate succeeded on attempt %d", attempt + 1)
                return result
            except ConnectionError as exc:
                last_error = exc
                if attempt < MAX_RETRIES:
                    logger.warning(
                        "Ollama generate attempt %d failed, retrying in %.1fs: %s",
                        attempt + 1, RETRY_DELAY, exc,
                    )
                    await asyncio.sleep(RETRY_DELAY)
        logger.error("Ollama generate failed after %d attempts", 1 + MAX_RETRIES)
        raise last_error  # type: ig
_generate_sync method · python · L41-L66 (26 LOC)
backend/app/services/llm_service.py
    def _generate_sync(self, prompt: str, model: str) -> str:
        try:
            with httpx.Client() as client:
                resp = client.post(
                    f"{self.base_url}/api/generate",
                    json={"model": model, "prompt": prompt, "stream": False},
                    timeout=120.0,
                )
                resp.raise_for_status()
                return str(resp.json()["response"])
        except httpx.ConnectError as exc:
            raise ConnectionError(
                f"Ollama service unreachable at {self.base_url}"
            ) from exc
        except httpx.TimeoutException as exc:
            raise ConnectionError(
                f"Ollama request timed out after 120s at {self.base_url}"
            ) from exc
        except httpx.HTTPStatusError as exc:
            raise ConnectionError(
                f"Ollama returned error {exc.response.status_code}"
            ) from exc
        except (json.JSONDecodeError, KeyError) as exc:
RAGService class · python · L10-L73 (64 LOC)
backend/app/services/rag_service.py
class RAGService:
    """Retrieves relevant documents from ChromaDB using semantic search."""

    TICKET_COLLECTION = "whd_tickets"
    KB_COLLECTION = "kb_articles"

    def __init__(self, chroma_client: ClientAPI) -> None:
        self.client = chroma_client
        self.embed_svc = EmbedService()

    async def retrieve(self, query: str, max_docs: int = 5) -> list[ContextDoc]:
        """Embed query, search both collections, merge and rank by score."""
        embedding = await self.embed_svc.embed(query)

        ticket_docs, kb_docs = await asyncio.gather(
            self._query_collection(self.TICKET_COLLECTION, embedding, max_docs // 2 + 1),
            self._query_collection(self.KB_COLLECTION, embedding, max_docs - max_docs // 2),
        )

        all_docs = ticket_docs + kb_docs
        all_docs.sort(key=lambda d: d.score, reverse=True)
        return all_docs[:max_docs]

    async def _query_collection(
        self, name: str, embedding: list[float], n_results: int
    
__init__ method · python · L16-L18 (3 LOC)
backend/app/services/rag_service.py
    def __init__(self, chroma_client: ClientAPI) -> None:
        self.client = chroma_client
        self.embed_svc = EmbedService()
retrieve method · python · L20-L31 (12 LOC)
backend/app/services/rag_service.py
    async def retrieve(self, query: str, max_docs: int = 5) -> list[ContextDoc]:
        """Embed query, search both collections, merge and rank by score."""
        embedding = await self.embed_svc.embed(query)

        ticket_docs, kb_docs = await asyncio.gather(
            self._query_collection(self.TICKET_COLLECTION, embedding, max_docs // 2 + 1),
            self._query_collection(self.KB_COLLECTION, embedding, max_docs - max_docs // 2),
        )

        all_docs = ticket_docs + kb_docs
        all_docs.sort(key=lambda d: d.score, reverse=True)
        return all_docs[:max_docs]
Repobility · open methodology · https://repobility.com/research/
_query_collection method · python · L33-L36 (4 LOC)
backend/app/services/rag_service.py
    async def _query_collection(
        self, name: str, embedding: list[float], n_results: int
    ) -> list[ContextDoc]:
        return await asyncio.to_thread(self._query_sync, name, embedding, n_results)
_query_sync method · python · L38-L73 (36 LOC)
backend/app/services/rag_service.py
    def _query_sync(
        self, name: str, embedding: list[float], n_results: int
    ) -> list[ContextDoc]:
        try:
            col = self.client.get_collection(name)
        except Exception:
            return []

        count = col.count()
        if count == 0:
            return []

        n_results = min(n_results, count)
        results: Any = col.query(
            query_embeddings=[embedding],  # type: ignore[arg-type]
            n_results=n_results,
            include=["documents", "metadatas", "distances"],
        )

        docs = []
        for content, meta, distance in zip(
            results["documents"][0],
            results["metadatas"][0],
            results["distances"][0],
        ):
            source = "ticket" if name == self.TICKET_COLLECTION else "kb"
            score = max(0.0, 1.0 - float(distance))
            docs.append(
                ContextDoc(
                    content=content,
                    source=source,
                 
chunk_by_tokens function · python · L4-L25 (22 LOC)
backend/app/utils/chunker.py
def chunk_by_tokens(
    text: str, max_tokens: int = 500, overlap_tokens: int = 50
) -> list[str]:
    """
    Simple whitespace-based chunking approximation.
    Splits text into chunks of ~max_tokens words with overlap.
    """
    words = text.split()
    if not words:
        return []

    chunks: list[str] = []
    start = 0
    while start < len(words):
        end = min(start + max_tokens, len(words))
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        if end == len(words):
            break
        start = end - overlap_tokens

    return chunks
page 1 / 3next ›