Function bodies 138 total
Settings class · python · L8-L60 (53 LOC)backend/app/config.py
class Settings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
extra="ignore",
)
ollama_base_url: str = "http://localhost:11434"
chroma_path: str = "./chroma_data"
cors_origin: str = "chrome-extension://placeholder"
default_model: str = "llama3.2:3b"
version: str = "1.4.0"
prompt_template_path: str | None = None
# Enterprise security
# Set API_TOKEN in .env to a long random secret (e.g. `openssl rand -hex 32`).
# The extension must send this in the X-Extension-Token header.
# Leave empty ("") in dev to disable token auth.
api_token: str = ""
# Rate limiting: max /generate calls per client IP per minute
rate_limit_per_minute: int = 20
# Request body size limit in bytes (default 64 KB)
max_request_bytes: int = 65_536
# Max upload file size in bytes (default 50 MB)
max_upload_bytes: int = 52_428_800
@model_validator(mode="after")
dreject_wildcard_cors_with_token method · python · L38-L60 (23 LOC)backend/app/config.py
def reject_wildcard_cors_with_token(self) -> Self:
"""Reject CORS_ORIGIN=* when API_TOKEN is set.
Combining a wildcard CORS origin with token-based authentication is a
security misconfiguration: any website can issue credentialed requests
against the backend, making the token the only line of defence. In
production (api_token non-empty) a specific extension origin must be
provided instead.
"""
if self.api_token and self.cors_origin == "*":
raise ValueError(
"CORS_ORIGIN='*' is not allowed when API_TOKEN is set. "
"Set CORS_ORIGIN to your extension origin "
"(e.g. chrome-extension://<ID>) to secure the backend."
)
if not self.api_token and self.cors_origin == "*":
warnings.warn(
"CORS_ORIGIN='*' is set with no API_TOKEN — acceptable for "
"local development only. Never use this configuration in "JSONFormatter class · python · L9-L21 (13 LOC)backend/app/logging_config.py
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record: logging.LogRecord) -> str:
log_entry: dict[str, object] = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info and record.exc_info[0] is not None:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)format method · python · L12-L21 (10 LOC)backend/app/logging_config.py
def format(self, record: logging.LogRecord) -> str:
log_entry: dict[str, object] = {
"timestamp": datetime.now(UTC).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
}
if record.exc_info and record.exc_info[0] is not None:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)setup_logging function · python · L24-L37 (14 LOC)backend/app/logging_config.py
def setup_logging(level: str = "INFO") -> None:
"""Configure structured JSON logging for the application."""
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(JSONFormatter())
root = logging.getLogger()
root.setLevel(getattr(logging, level.upper(), logging.INFO))
root.handlers.clear()
root.addHandler(handler)
# Quiet noisy libraries
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
logging.getLogger("chromadb").setLevel(logging.WARNING)
logging.getLogger("httpx").setLevel(logging.WARNING)lifespan function · python · L25-L45 (21 LOC)backend/app/main.py
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""Initialize ChromaDB client and verify Ollama on startup."""
logger.info("Starting AI Helpdesk Assistant backend v%s", settings.version)
app.state.chroma_client = chromadb.PersistentClient(path=settings.chroma_path)
logger.info("ChromaDB initialized at %s", settings.chroma_path)
app.state.ollama_reachable = False
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=5.0)
app.state.ollama_reachable = resp.status_code == 200
except Exception:
app.state.ollama_reachable = False
if app.state.ollama_reachable:
logger.info("Ollama reachable at %s", settings.ollama_base_url)
else:
logger.warning("Ollama not reachable at %s", settings.ollama_base_url)
yieldcreate_app function · python · L48-L86 (39 LOC)backend/app/main.py
def create_app() -> FastAPI:
app = FastAPI(
title="AI Helpdesk Assistant Backend",
version=settings.version,
lifespan=lifespan,
)
# Middleware is applied in reverse order (last added = outermost wrapper)
# Execution order for a request: SecurityHeaders → CORS → SizeLimit → RateLimit → APIToken → router
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=[settings.cors_origin],
allow_credentials=False,
allow_methods=["GET", "POST"],
allow_headers=["Content-Type", "X-Extension-Token"],
)
app.add_middleware(
RequestSizeLimitMiddleware,
max_bytes=settings.max_request_bytes,
exempt_paths={"/ingest/upload"},
)
app.add_middleware(
RateLimitMiddleware,
max_per_minute=settings.rate_limit_per_minute,
)
app.add_middleware(APITokenMiddleware)
app.include_router(health.router)
app.include_roIf a scraper extracted this row, it came from Repobility (https://repobility.com)
APITokenMiddleware class · python · L31-L64 (34 LOC)backend/app/middleware/security.py
class APITokenMiddleware(BaseHTTPMiddleware):
"""
Validates the X-Extension-Token header on all non-health requests.
The extension sends a shared secret configured in both:
- Backend: API_TOKEN env var (required in production)
- Extension: stored in chrome.storage.local (never synced)
/health is exempt so operators can monitor without the token.
"""
EXEMPT_PATHS = {"/health", "/docs", "/openapi.json"}
def __init__(self, app: ASGIApp) -> None:
super().__init__(app)
self._token = settings.api_token
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
if request.url.path in self.EXEMPT_PATHS:
return await call_next(request)
# If no token is configured, skip auth (dev mode only)
if not self._token:
return await call_next(request)
provided = request.headers.get("X-Extension-Token", "")
if not provided or no__init__ method · python · L44-L46 (3 LOC)backend/app/middleware/security.py
def __init__(self, app: ASGIApp) -> None:
super().__init__(app)
self._token = settings.api_tokendispatch method · python · L48-L64 (17 LOC)backend/app/middleware/security.py
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
if request.url.path in self.EXEMPT_PATHS:
return await call_next(request)
# If no token is configured, skip auth (dev mode only)
if not self._token:
return await call_next(request)
provided = request.headers.get("X-Extension-Token", "")
if not provided or not secrets.compare_digest(provided, self._token):
logger.warning("Auth failure on %s from %s", request.url.path, request.client.host if request.client else "unknown")
return JSONResponse(
status_code=401,
content={"detail": "Unauthorized. Missing or invalid X-Extension-Token header."},
)
return await call_next(request)RateLimitMiddleware class · python · L74-L145 (72 LOC)backend/app/middleware/security.py
class RateLimitMiddleware(BaseHTTPMiddleware):
"""
Simple in-process rate limiter.
Limits /generate and /ingest/upload to configurable requests per client IP.
Memory management: a periodic sweep (at most once per window) evicts entries
for IPs whose most-recent request is older than the window. This prevents
unbounded growth of ``_counts`` when the server receives traffic from many
distinct source IPs over time.
"""
RATE_LIMITED_PATHS = {"/generate", "/ingest/upload"}
def __init__(self, app: ASGIApp, max_per_minute: int = 20) -> None:
super().__init__(app)
self._max = max_per_minute
self._window = 60.0 # seconds
# Keyed by "{path}:{ip}" to track per-path, per-IP rate limits
self._counts: dict[str, list[float]] = defaultdict(list)
self._lock = asyncio.Lock()
self._last_sweep: float = 0.0 # monotonic timestamp of the most-recent cleanup sweep
self._path_limits: dict[str, int] = {__init__ method · python · L87-L98 (12 LOC)backend/app/middleware/security.py
def __init__(self, app: ASGIApp, max_per_minute: int = 20) -> None:
super().__init__(app)
self._max = max_per_minute
self._window = 60.0 # seconds
# Keyed by "{path}:{ip}" to track per-path, per-IP rate limits
self._counts: dict[str, list[float]] = defaultdict(list)
self._lock = asyncio.Lock()
self._last_sweep: float = 0.0 # monotonic timestamp of the most-recent cleanup sweep
self._path_limits: dict[str, int] = {
"/generate": max_per_minute,
"/ingest/upload": INGEST_RATE_LIMIT,
}_evict_stale_entries method · python · L100-L114 (15 LOC)backend/app/middleware/security.py
def _evict_stale_entries(self, now: float) -> None:
"""Remove IPs whose most-recent request falls outside the current window.
Must be called while ``self._lock`` is held. A full sweep is performed
at most once per window period so that the amortised cost per request is
negligible even under high load.
"""
if now - self._last_sweep < self._window:
return
stale_ips = [ip for ip, ts in self._counts.items() if not ts or now - ts[-1] >= self._window]
for ip in stale_ips:
del self._counts[ip]
self._last_sweep = nowdispatch method · python · L116-L145 (30 LOC)backend/app/middleware/security.py
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
if request.url.path not in self.RATE_LIMITED_PATHS:
return await call_next(request)
client_ip = request.client.host if request.client else "unknown"
rate_key = f"{request.url.path}:{client_ip}"
limit = self._path_limits.get(request.url.path, self._max)
now = time.monotonic()
async with self._lock:
# Periodic cleanup: evict stale IP entries to bound memory usage.
self._evict_stale_entries(now)
timestamps = self._counts[rate_key]
# Remove timestamps outside the window
self._counts[rate_key] = [t for t in timestamps if now - t < self._window]
if len(self._counts[rate_key]) >= limit:
logger.warning("Rate limit exceeded for %s on %s", client_ip, request.url.path)
return JSONResponse(
status_codeRequestSizeLimitMiddleware class · python · L152-L189 (38 LOC)backend/app/middleware/security.py
class RequestSizeLimitMiddleware(BaseHTTPMiddleware):
"""
Rejects requests whose body exceeds max_bytes.
Prevents oversized payloads from being forwarded to Ollama.
Default: 64 KB — sufficient for the largest reasonable ticket description.
"""
def __init__(
self,
app: ASGIApp,
max_bytes: int = 65_536,
exempt_paths: set[str] | None = None,
) -> None:
super().__init__(app)
self._max_bytes = max_bytes
self._exempt_paths = exempt_paths or set()
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
if request.url.path in self._exempt_paths:
return await call_next(request)
content_length = request.headers.get("content-length")
if content_length:
if int(content_length) > self._max_bytes:
return self._too_large()
elif request.method in {"POST", "PUT", "PATCH"}:
body = Repobility · open methodology · https://repobility.com/research/
__init__ method · python · L159-L167 (9 LOC)backend/app/middleware/security.py
def __init__(
self,
app: ASGIApp,
max_bytes: int = 65_536,
exempt_paths: set[str] | None = None,
) -> None:
super().__init__(app)
self._max_bytes = max_bytes
self._exempt_paths = exempt_paths or set()dispatch method · python · L169-L180 (12 LOC)backend/app/middleware/security.py
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
if request.url.path in self._exempt_paths:
return await call_next(request)
content_length = request.headers.get("content-length")
if content_length:
if int(content_length) > self._max_bytes:
return self._too_large()
elif request.method in {"POST", "PUT", "PATCH"}:
body = await request.body()
if len(body) > self._max_bytes:
return self._too_large()
return await call_next(request)_too_large method · python · L182-L189 (8 LOC)backend/app/middleware/security.py
def _too_large(self) -> JSONResponse:
return JSONResponse(
status_code=413,
content={
"detail": f"Request body too large. Max {self._max_bytes} bytes.",
"error_code": "PAYLOAD_TOO_LARGE",
},
)SecurityHeadersMiddleware class · python · L196-L208 (13 LOC)backend/app/middleware/security.py
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Adds defensive HTTP security headers to all responses."""
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Cache-Control"] = "no-store"
response.headers["Referrer-Policy"] = "no-referrer"
# Remove server version banner
if "server" in response.headers:
del response.headers["server"]
return responsedispatch method · python · L199-L208 (10 LOC)backend/app/middleware/security.py
async def dispatch(self, request: Request, call_next: Callable[[Request], Awaitable[Response]]) -> Response:
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Cache-Control"] = "no-store"
response.headers["Referrer-Policy"] = "no-referrer"
# Remove server version banner
if "server" in response.headers:
del response.headers["server"]
return responseGenerateRequest class · python · L11-L26 (16 LOC)backend/app/models/request_models.py
class GenerateRequest(BaseModel):
ticket_subject: str = Field(
default="", max_length=_SUBJECT_MAX, description="Ticket subject line"
)
ticket_description: str = Field(
default="", max_length=_DESCRIPTION_MAX, description="Full problem description"
)
requester_name: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="Requester's name")
category: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="WHD ticket category")
status: str = Field(default="", max_length=_SHORT_FIELD_MAX, description="WHD ticket status")
model: str = Field(default="llama3.2:3b", max_length=_MODEL_MAX, description="Ollama model to use")
max_context_docs: int = Field(default=5, ge=1, le=20)
stream: bool = Field(default=False)
prompt_suffix: str = Field(
default="", max_length=2000, description="Custom instructions appended to the prompt"
)ContextDoc class · python · L4-L8 (5 LOC)backend/app/models/response_models.py
class ContextDoc(BaseModel):
content: str
source: str # "kb" | "ticket"
score: float
metadata: dict[str, object]GenerateResponse class · python · L11-L15 (5 LOC)backend/app/models/response_models.py
class GenerateResponse(BaseModel):
reply: str
model_used: str
context_docs: list[ContextDoc]
latency_ms: intGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
IngestUploadResponse class · python · L18-L23 (6 LOC)backend/app/models/response_models.py
class IngestUploadResponse(BaseModel):
filename: str
collection: str
chunks_ingested: int
processing_time_ms: int
warning: str | None = None # e.g., "No text content extracted"generate_reply function · python · L17-L72 (56 LOC)backend/app/routers/generate.py
async def generate_reply(body: GenerateRequest, request: Request) -> GenerateResponse:
"""Retrieve RAG context and generate a helpdesk reply via Ollama."""
chroma_client = request.app.state.chroma_client
rag = RAGService(chroma_client=chroma_client)
llm = LLMService()
logger.info("Generate request: model=%s subject=%s", body.model, body.ticket_subject[:80])
start = time.monotonic()
# Retrieve context
query = f"{body.ticket_subject}\n\n{body.ticket_description}".strip()
try:
context_docs = await rag.retrieve(
query=query or "general helpdesk inquiry",
max_docs=body.max_context_docs,
)
except ConnectionError as exc:
raise HTTPException(
status_code=503,
detail={
"detail": str(exc),
"error_code": "OLLAMA_DOWN",
},
) from exc
# Build prompt
context_text = "\n\n---\n\n".join(
f"[{doc.source.upper()} | score: {d_build_prompt function · python · L75-L94 (20 LOC)backend/app/routers/generate.py
def _build_prompt(body: GenerateRequest, context_text: str) -> str:
subject = body.ticket_subject or "(not available)"
description = body.ticket_description or "(not available)"
return f"""You are a helpful IT helpdesk assistant. Answer the technician's query based on the ticket context and retrieved knowledge below.
TICKET
Subject: {subject}
Requester: {body.requester_name} | Category: {body.category} | Status: {body.status}
Description: {description}
RELEVANT CONTEXT
{context_text if context_text else "No relevant context found in knowledge base."}
INSTRUCTIONS
- Write a professional, empathetic reply to the requester
- Reference specific steps from the knowledge base when available
- Keep the reply concise (under 200 words unless complexity demands more)
- Do not hallucinate software versions or ticket numbers
REPLY:"""health_check function · python · L16-L43 (28 LOC)backend/app/routers/health.py
async def health_check(request: Request) -> dict[str, object]:
"""Return system health status including Ollama and ChromaDB state."""
ollama_reachable = False
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=5.0)
ollama_reachable = resp.status_code == 200
except Exception:
ollama_reachable = False
chroma_ready = False
chroma_doc_counts: dict[str, int] = {}
try:
chroma_client = request.app.state.chroma_client
collections = chroma_client.list_collections()
chroma_ready = True
for col in collections:
chroma_doc_counts[col.name] = col.count()
except Exception:
chroma_ready = False
return {
"status": "ok" if ollama_reachable and chroma_ready else "degraded",
"ollama_reachable": ollama_reachable,
"chroma_ready": chroma_ready,
"chroma_doc_counts": chroma_doc_countsshutdown_backend function · python · L47-L55 (9 LOC)backend/app/routers/health.py
async def shutdown_backend() -> dict[str, str]:
"""Gracefully shut down the backend server after a short delay."""
async def _delayed_kill() -> None:
await asyncio.sleep(0.5)
os.kill(os.getpid(), signal.SIGTERM)
asyncio.create_task(_delayed_kill())
return {"status": "shutting_down"}start_ollama function · python · L59-L83 (25 LOC)backend/app/routers/health.py
async def start_ollama() -> dict[str, str]:
"""Start the Ollama server as a detached background process."""
# Check if already running
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=3.0)
if resp.status_code == 200:
return {"status": "already_running"}
except Exception:
pass
if sys.platform == "win32":
subprocess.Popen(
["ollama", "serve"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NO_WINDOW,
)
else:
subprocess.Popen(
["ollama", "serve"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return {"status": "starting"}stop_ollama function · python · L87-L104 (18 LOC)backend/app/routers/health.py
async def stop_ollama() -> dict[str, str]:
"""Stop the Ollama server process."""
if sys.platform == "win32":
subprocess.run(
["taskkill", "/IM", "ollama.exe", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
subprocess.run(
["taskkill", "/IM", "ollama_llama_server.exe", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=False,
)
else:
subprocess.run(["pkill", "-f", "ollama"], check=False)
return {"status": "stopping"}upload_file function · python · L34-L136 (103 LOC)backend/app/routers/ingest.py
async def upload_file(request: Request, file: UploadFile) -> IngestUploadResponse:
"""Upload a single file for ingestion into ChromaDB."""
# Validate filename
if not file.filename:
return JSONResponse( # type: ignore[return-value]
status_code=422,
content={"detail": "No filename provided."},
)
# Sanitize filename (strip directory components)
safe_name = PurePosixPath(file.filename).name
suffix = Path(safe_name).suffix.lower()
if suffix not in ALLOWED_EXTENSIONS:
return JSONResponse( # type: ignore[return-value]
status_code=422,
content={
"detail": (
f"Unsupported file type: {suffix}. "
f"Allowed: {', '.join(sorted(ALLOWED_EXTENSIONS))}"
),
},
)
# Try to acquire the upload semaphore (non-blocking)
if not _upload_semaphore._value: # noqa: SLF001
return JSONResponse( # type: igMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
clear_collection function · python · L140-L161 (22 LOC)backend/app/routers/ingest.py
async def clear_collection(request: Request, name: str) -> dict[str, str]:
"""Delete all documents from a collection."""
if name not in ALLOWED_COLLECTIONS:
return JSONResponse( # type: ignore[return-value]
status_code=404,
content={
"detail": (
f"Unknown collection: {name}. "
f"Allowed: {', '.join(sorted(ALLOWED_COLLECTIONS))}"
),
},
)
chroma_client = request.app.state.chroma_client
try:
await asyncio.to_thread(chroma_client.delete_collection, name)
except ValueError:
# Collection doesn't exist — that's fine, idempotent
pass
return {"status": "ok", "collection": name}_stream_to_temp function · python · L164-L195 (32 LOC)backend/app/routers/ingest.py
async def _stream_to_temp(
file: UploadFile, safe_name: str, suffix: str,
) -> Path:
"""Stream uploaded file to a temp file, enforcing size limit."""
max_bytes = settings.max_upload_bytes
written = 0
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=suffix, prefix=f"ingest_{safe_name}_",
)
tmp_path = Path(tmp.name)
try:
while True:
chunk = await file.read(_CHUNK_SIZE)
if not chunk:
break
written += len(chunk)
if written > max_bytes:
tmp.close()
_cleanup_temp(tmp_path)
raise _PayloadTooLargeError(max_bytes)
tmp.write(chunk)
tmp.close()
except _PayloadTooLargeError:
raise
except Exception:
tmp.close()
_cleanup_temp(tmp_path)
raise
return tmp_path_PayloadTooLargeError class · python · L198-L203 (6 LOC)backend/app/routers/ingest.py
class _PayloadTooLargeError(Exception):
"""Raised when streamed upload exceeds size limit."""
def __init__(self, max_bytes: int) -> None:
self.max_bytes = max_bytes
super().__init__(f"File exceeds maximum upload size of {max_bytes} bytes.")__init__ method · python · L201-L203 (3 LOC)backend/app/routers/ingest.py
def __init__(self, max_bytes: int) -> None:
self.max_bytes = max_bytes
super().__init__(f"File exceeds maximum upload size of {max_bytes} bytes.")_cleanup_temp function · python · L206-L215 (10 LOC)backend/app/routers/ingest.py
def _cleanup_temp(path: Path, retries: int = 3, delay: float = 0.5) -> None:
"""Remove temp file with retries for Windows AV locks."""
for attempt in range(retries):
try:
path.unlink(missing_ok=True)
return
except OSError:
if attempt < retries - 1:
time.sleep(delay)
logger.warning("Could not delete temp file: %s", path)list_models function · python · L12-L25 (14 LOC)backend/app/routers/models.py
async def list_models() -> dict[str, list[str]]:
"""Proxy Ollama's /api/tags to return available model names."""
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"{settings.ollama_base_url}/api/tags", timeout=10.0)
resp.raise_for_status()
data = resp.json()
model_names = [m["name"] for m in data.get("models", [])]
return {"models": model_names}
except (httpx.HTTPError, json.JSONDecodeError, KeyError) as exc:
raise HTTPException(
status_code=503,
detail={"detail": "Cannot reach Ollama to list models.", "error_code": "OLLAMA_DOWN"},
) from excEmbedService class · python · L15-L72 (58 LOC)backend/app/services/embed_service.py
class EmbedService:
"""Generates embeddings using nomic-embed-text via Ollama."""
def __init__(self, model: str = "nomic-embed-text") -> None:
self.model = model
self.base_url = settings.ollama_base_url
async def embed(self, text: str) -> list[float]:
"""Return the embedding vector with retry logic."""
last_error: ConnectionError | None = None
for attempt in range(1 + MAX_RETRIES):
try:
result = await asyncio.to_thread(self._embed_sync, text)
if attempt > 0:
logger.info("Ollama embed succeeded on attempt %d", attempt + 1)
return result
except ConnectionError as exc:
last_error = exc
if attempt < MAX_RETRIES:
logger.warning(
"Ollama embed attempt %d failed, retrying in %.1fs: %s",
attempt + 1, RETRY_DELAY, exc,
)
__init__ method · python · L18-L20 (3 LOC)backend/app/services/embed_service.py
def __init__(self, model: str = "nomic-embed-text") -> None:
self.model = model
self.base_url = settings.ollama_base_urlIf a scraper extracted this row, it came from Repobility (https://repobility.com)
embed method · python · L22-L40 (19 LOC)backend/app/services/embed_service.py
async def embed(self, text: str) -> list[float]:
"""Return the embedding vector with retry logic."""
last_error: ConnectionError | None = None
for attempt in range(1 + MAX_RETRIES):
try:
result = await asyncio.to_thread(self._embed_sync, text)
if attempt > 0:
logger.info("Ollama embed succeeded on attempt %d", attempt + 1)
return result
except ConnectionError as exc:
last_error = exc
if attempt < MAX_RETRIES:
logger.warning(
"Ollama embed attempt %d failed, retrying in %.1fs: %s",
attempt + 1, RETRY_DELAY, exc,
)
await asyncio.sleep(RETRY_DELAY)
logger.error("Ollama embed failed after %d attempts", 1 + MAX_RETRIES)
raise last_error # type: ignore[misc]_embed_sync method · python · L42-L72 (31 LOC)backend/app/services/embed_service.py
def _embed_sync(self, text: str) -> list[float]:
try:
with httpx.Client() as client:
resp = client.post(
f"{self.base_url}/api/embeddings",
json={"model": self.model, "prompt": text},
timeout=30.0,
)
resp.raise_for_status()
data = resp.json()
if "embedding" not in data:
raise ConnectionError(
f"Ollama embed response missing 'embedding' key: {list(data.keys())}"
)
return list(data["embedding"])
except httpx.ConnectError as exc:
raise ConnectionError(
f"Ollama embed service unreachable at {self.base_url}"
) from exc
except httpx.TimeoutException as exc:
raise ConnectionError(
f"Ollama embed request timed out at {self.base_url}"
) from exc
except LLMService class · python · L15-L66 (52 LOC)backend/app/services/llm_service.py
class LLMService:
"""Generates text completions via Ollama."""
def __init__(self) -> None:
self.base_url = settings.ollama_base_url
async def generate(self, prompt: str, model: str) -> str:
"""Generate a completion with retry logic. Raises ConnectionError if Ollama is unreachable."""
last_error: ConnectionError | None = None
for attempt in range(1 + MAX_RETRIES):
try:
result = await asyncio.to_thread(self._generate_sync, prompt, model)
if attempt > 0:
logger.info("Ollama generate succeeded on attempt %d", attempt + 1)
return result
except ConnectionError as exc:
last_error = exc
if attempt < MAX_RETRIES:
logger.warning(
"Ollama generate attempt %d failed, retrying in %.1fs: %s",
attempt + 1, RETRY_DELAY, exc,
)
agenerate method · python · L21-L39 (19 LOC)backend/app/services/llm_service.py
async def generate(self, prompt: str, model: str) -> str:
"""Generate a completion with retry logic. Raises ConnectionError if Ollama is unreachable."""
last_error: ConnectionError | None = None
for attempt in range(1 + MAX_RETRIES):
try:
result = await asyncio.to_thread(self._generate_sync, prompt, model)
if attempt > 0:
logger.info("Ollama generate succeeded on attempt %d", attempt + 1)
return result
except ConnectionError as exc:
last_error = exc
if attempt < MAX_RETRIES:
logger.warning(
"Ollama generate attempt %d failed, retrying in %.1fs: %s",
attempt + 1, RETRY_DELAY, exc,
)
await asyncio.sleep(RETRY_DELAY)
logger.error("Ollama generate failed after %d attempts", 1 + MAX_RETRIES)
raise last_error # type: ig_generate_sync method · python · L41-L66 (26 LOC)backend/app/services/llm_service.py
def _generate_sync(self, prompt: str, model: str) -> str:
try:
with httpx.Client() as client:
resp = client.post(
f"{self.base_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": False},
timeout=120.0,
)
resp.raise_for_status()
return str(resp.json()["response"])
except httpx.ConnectError as exc:
raise ConnectionError(
f"Ollama service unreachable at {self.base_url}"
) from exc
except httpx.TimeoutException as exc:
raise ConnectionError(
f"Ollama request timed out after 120s at {self.base_url}"
) from exc
except httpx.HTTPStatusError as exc:
raise ConnectionError(
f"Ollama returned error {exc.response.status_code}"
) from exc
except (json.JSONDecodeError, KeyError) as exc:
RAGService class · python · L10-L73 (64 LOC)backend/app/services/rag_service.py
class RAGService:
"""Retrieves relevant documents from ChromaDB using semantic search."""
TICKET_COLLECTION = "whd_tickets"
KB_COLLECTION = "kb_articles"
def __init__(self, chroma_client: ClientAPI) -> None:
self.client = chroma_client
self.embed_svc = EmbedService()
async def retrieve(self, query: str, max_docs: int = 5) -> list[ContextDoc]:
"""Embed query, search both collections, merge and rank by score."""
embedding = await self.embed_svc.embed(query)
ticket_docs, kb_docs = await asyncio.gather(
self._query_collection(self.TICKET_COLLECTION, embedding, max_docs // 2 + 1),
self._query_collection(self.KB_COLLECTION, embedding, max_docs - max_docs // 2),
)
all_docs = ticket_docs + kb_docs
all_docs.sort(key=lambda d: d.score, reverse=True)
return all_docs[:max_docs]
async def _query_collection(
self, name: str, embedding: list[float], n_results: int
__init__ method · python · L16-L18 (3 LOC)backend/app/services/rag_service.py
def __init__(self, chroma_client: ClientAPI) -> None:
self.client = chroma_client
self.embed_svc = EmbedService()retrieve method · python · L20-L31 (12 LOC)backend/app/services/rag_service.py
async def retrieve(self, query: str, max_docs: int = 5) -> list[ContextDoc]:
"""Embed query, search both collections, merge and rank by score."""
embedding = await self.embed_svc.embed(query)
ticket_docs, kb_docs = await asyncio.gather(
self._query_collection(self.TICKET_COLLECTION, embedding, max_docs // 2 + 1),
self._query_collection(self.KB_COLLECTION, embedding, max_docs - max_docs // 2),
)
all_docs = ticket_docs + kb_docs
all_docs.sort(key=lambda d: d.score, reverse=True)
return all_docs[:max_docs]Repobility · open methodology · https://repobility.com/research/
_query_collection method · python · L33-L36 (4 LOC)backend/app/services/rag_service.py
async def _query_collection(
self, name: str, embedding: list[float], n_results: int
) -> list[ContextDoc]:
return await asyncio.to_thread(self._query_sync, name, embedding, n_results)_query_sync method · python · L38-L73 (36 LOC)backend/app/services/rag_service.py
def _query_sync(
self, name: str, embedding: list[float], n_results: int
) -> list[ContextDoc]:
try:
col = self.client.get_collection(name)
except Exception:
return []
count = col.count()
if count == 0:
return []
n_results = min(n_results, count)
results: Any = col.query(
query_embeddings=[embedding], # type: ignore[arg-type]
n_results=n_results,
include=["documents", "metadatas", "distances"],
)
docs = []
for content, meta, distance in zip(
results["documents"][0],
results["metadatas"][0],
results["distances"][0],
):
source = "ticket" if name == self.TICKET_COLLECTION else "kb"
score = max(0.0, 1.0 - float(distance))
docs.append(
ContextDoc(
content=content,
source=source,
chunk_by_tokens function · python · L4-L25 (22 LOC)backend/app/utils/chunker.py
def chunk_by_tokens(
text: str, max_tokens: int = 500, overlap_tokens: int = 50
) -> list[str]:
"""
Simple whitespace-based chunking approximation.
Splits text into chunks of ~max_tokens words with overlap.
"""
words = text.split()
if not words:
return []
chunks: list[str] = []
start = 0
while start < len(words):
end = min(start + max_tokens, len(words))
chunk = " ".join(words[start:end])
chunks.append(chunk)
if end == len(words):
break
start = end - overlap_tokens
return chunkspage 1 / 3next ›