Function bodies 1,000 total
summarize_conversation function · python · L277-L361 (85 LOC)api/routers/drafts.py
async def summarize_conversation(
summary_request: DraftSummaryRequest,
request: Request,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> DraftSummaryResponse:
"""Summarize a conversation using AI.
Analyzes the specified number of messages from a conversation and generates
a concise summary with key points. Uses the local MLX model for processing.
**Rate Limiting:**
This endpoint is rate limited to 10 requests per minute to prevent
resource exhaustion from CPU-intensive model generation.
**What You Get:**
- A 1-2 sentence summary of the conversation
- 2-4 key points extracted from the discussion
- The date range of messages included in the summary
**Use Cases:**
- Catch up on a conversation you've missed
- Get a quick overview of a long group chat
- Find important decisions or action items
**Example Request:**
```json
{
"chat_id": "chat123456789",
"num_messages": 50
}
generate_smart_reply function · python · L411-L446 (36 LOC)api/routers/drafts.py
async def generate_smart_reply(
routed_request: RoutedReplyRequest,
request: Request,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> RoutedReplyResponse:
"""Generate a smart routed reply using the simplified router.
Current behavior:
- Non-empty messages: generated reply path.
- Empty/invalid messages: clarify response.
- Similarity and mobilization data influence prompt quality and confidence.
Args:
routed_request: RoutedReplyRequest with chat_id and optional last_message
request: FastAPI request object (for rate limiting)
Returns:
RoutedReplyResponse with response, type, confidence, and metadata
"""
messages = await fetch_messages(reader, routed_request.chat_id, routed_request.context_messages)
ensure_messages_exist(routed_request.chat_id, messages)
last_message, thread_context, context_info = build_smart_reply_input(
messages=messages,
requested_last_message=routed_request.l_experiment_to_response function · python · L133-L143 (11 LOC)api/routers/experiments.py
def _experiment_to_response(exp: Any) -> ExperimentResponse:
"""Convert Experiment to ExperimentResponse."""
return ExperimentResponse(
name=exp.name,
enabled=exp.enabled,
description=exp.description,
created_at=exp.created_at,
variants=[
VariantConfigResponse(id=v.id, weight=v.weight, config=v.config) for v in exp.variants
],
)_results_to_response function · python · L146-L166 (21 LOC)api/routers/experiments.py
def _results_to_response(results: ExperimentResults) -> ExperimentResultsResponse:
"""Convert ExperimentResults to ExperimentResultsResponse."""
return ExperimentResultsResponse(
experiment_name=results.experiment_name,
variants=[
VariantResultsResponse(
variant_id=v.variant_id,
total_impressions=v.total_impressions,
sent_unchanged=v.sent_unchanged,
sent_edited=v.sent_edited,
dismissed=v.dismissed,
regenerated=v.regenerated,
conversion_rate=v.conversion_rate,
)
for v in results.variants
],
total_outcomes=results.total_outcomes,
winner=results.winner,
is_significant=results.is_significant,
p_value=results.p_value,
)list_experiments function · python · L187-L209 (23 LOC)api/routers/experiments.py
def list_experiments(active_only: bool = False) -> ExperimentListResponse:
"""List all configured experiments.
Returns a list of all experiments with their configurations.
Optionally filter to show only active (enabled) experiments.
Args:
active_only: If true, only return enabled experiments.
Returns:
ExperimentListResponse with list of experiments.
"""
manager = _get_manager()
if active_only:
experiments = manager.get_active_experiments()
else:
experiments = manager.get_experiments()
return ExperimentListResponse(
experiments=[_experiment_to_response(exp) for exp in experiments],
total=len(experiments),
)get_experiment function · python · L221-L242 (22 LOC)api/routers/experiments.py
def get_experiment(name: str) -> ExperimentResponse:
"""Get details of a specific experiment.
Args:
name: The experiment name.
Returns:
ExperimentResponse with experiment details.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
return _experiment_to_response(experiment)get_experiment_results function · python · L254-L278 (25 LOC)api/routers/experiments.py
def get_experiment_results(name: str) -> ExperimentResultsResponse:
"""Get results and statistics for an experiment.
Returns win rates per variant, statistical significance indicators,
and p-value from chi-squared test (for two-variant experiments).
Args:
name: The experiment name.
Returns:
ExperimentResultsResponse with results and statistics.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
results = manager.calculate_results(name)
if not results:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
return _results_to_response(results)Same scanner, your repo: https://repobility.com — Repobility
record_outcome function · python · L291-L345 (55 LOC)api/routers/experiments.py
def record_outcome(name: str, request: RecordOutcomeRequest) -> RecordOutcomeResponse:
"""Record an outcome for an experiment.
Records the user's action (sent_unchanged, sent_edited, dismissed, regenerated)
for tracking conversion rates and calculating statistical significance.
Args:
name: The experiment name.
request: RecordOutcomeRequest with variant_id, contact_id, and action.
Returns:
RecordOutcomeResponse confirming the recorded outcome.
Raises:
HTTPException 400: Invalid action.
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
# Validate action
try:
action = UserAction(request.action)
except ValueError:
valid_actions = [a.value for a in UserAction]
raise HTTPExccreate_experiment function · python · L359-L442 (84 LOC)api/routers/experiments.py
def create_experiment(request: CreateExperimentRequest) -> ExperimentResponse:
"""Create a new A/B testing experiment.
Each experiment needs at least two variants with allocation weights
that should sum to 100.
**Example Request:**
```json
{
"name": "reply_tone_test",
"description": "Test casual vs professional reply tones",
"enabled": true,
"variants": [
{
"id": "control",
"weight": 50,
"config": {
"system_prompt": "You are a helpful assistant.",
"temperature": 0.7
}
},
{
"id": "treatment",
"weight": 50,
"config": {
"system_prompt": "You are a friendly, casual assistant.",
"temperature": 0.8
}
}
]
}
```
Args:
request: CreateExperimentRequest with eupdate_experiment function · python · L454-L487 (34 LOC)api/routers/experiments.py
def update_experiment(name: str, request: UpdateExperimentRequest) -> ExperimentResponse:
"""Update an existing experiment.
Currently supports enabling/disabling experiments.
Args:
name: The experiment name.
request: UpdateExperimentRequest with new values.
Returns:
ExperimentResponse with updated experiment.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
success = manager.update_experiment(name, enabled=request.enabled)
if not success:
raise HTTPException(
status_code=500,
detail="Failed to update experiment",
)
# Reload to get updated state
experiment = manager.get_experiment(name)
return _experiment_to_response(experiment)delete_experiment function · python · L498-L529 (32 LOC)api/routers/experiments.py
def delete_experiment(name: str) -> dict[str, Any]:
"""Delete an experiment.
Warning: This will permanently delete the experiment configuration.
Recorded outcomes are not deleted.
Args:
name: The experiment name.
Returns:
Confirmation message.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
success = manager.delete_experiment(name)
if not success:
raise HTTPException(
status_code=500,
detail="Failed to delete experiment",
)
return {"status": "deleted", "experiment_name": name}clear_experiment_outcomes function · python · L540-L570 (31 LOC)api/routers/experiments.py
def clear_experiment_outcomes(name: str) -> dict[str, Any]:
"""Clear all recorded outcomes for an experiment.
Use this to reset experiment data when starting a new test cycle.
Args:
name: The experiment name.
Returns:
Confirmation message.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
success = manager.clear_outcomes(name)
if not success:
raise HTTPException(
status_code=500,
detail="Failed to clear outcomes",
)
return {"status": "cleared", "experiment_name": name}get_variant_for_contact function · python · L582-L611 (30 LOC)api/routers/experiments.py
def get_variant_for_contact(name: str, contact_id: str) -> VariantConfigResponse | None:
"""Get the variant assignment for a specific contact.
Uses consistent hashing to ensure the same contact always gets
the same variant for a given experiment.
Args:
name: The experiment name.
contact_id: Identifier for the contact (phone, email, etc.).
Returns:
VariantConfigResponse with the assigned variant, or None if disabled.
Raises:
HTTPException 404: Experiment not found.
"""
manager = _get_manager()
experiment = manager.get_experiment(name)
if not experiment:
raise HTTPException(
status_code=404,
detail=f"Experiment not found: {name}",
)
variant = manager.get_variant(name, contact_id)
if not variant:
return None
return VariantConfigResponse(id=variant.id, weight=variant.weight, config=variant.config)export_conversation function · python · L94-L190 (97 LOC)api/routers/export.py
async def export_conversation(
request: Request,
export_request: ExportConversationRequest,
chat_id: str = Path(
...,
min_length=1,
max_length=255,
description="The unique conversation identifier",
examples=["chat123456789"],
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
"""Export a single conversation.
Exports all messages from a conversation in the specified format.
Supports JSON (full data), CSV (flattened), and TXT (human-readable).
**Rate Limiting:**
This endpoint is rate limited to 60 requests per minute.
Args:
chat_id: The conversation ID to export.
request: Export options including format and date range.
http_request: FastAPI request object (for rate limiting)
Returns:
ExportResponse with exported data.
Raises:
HTTPException 408: Request timed out
HTTPException 429: Rate limit exceeded
"""
try:
export_search function · python · L195-L273 (79 LOC)api/routers/export.py
async def export_search(
search_request: ExportSearchRequest,
request: Request,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
"""Export search results.
Searches messages and exports the results in the specified format.
**Rate Limiting:**
This endpoint is rate limited to 60 requests per minute.
Args:
request: Search query and export options.
request: FastAPI request object (for rate limiting)
Returns:
ExportResponse with exported search results.
Raises:
HTTPException 408: Request timed out
HTTPException 429: Rate limit exceeded
"""
try:
async with asyncio.timeout(get_timeout_read()):
# Perform search
after = search_request.date_range.start if search_request.date_range else None
before = search_request.date_range.end if search_request.date_range else None
messages = await run_in_threadpool(
readerProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
export_full_backup function · python · L281-L381 (101 LOC)api/routers/export.py
async def export_full_backup(
backup_request: ExportBackupRequest,
request: Request,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
"""Create a full backup of accessible conversations.
Exports multiple conversations with their messages in JSON format.
Only JSON format is supported for full backups.
**Rate Limiting:**
This endpoint is rate limited to 60 requests per minute.
Args:
backup_request: Backup options including limits.
request: FastAPI request object (for rate limiting)
Returns:
ExportResponse with backup data.
Raises:
HTTPException 408: Request timed out
HTTPException 429: Rate limit exceeded
"""
try:
async with asyncio.timeout(TIMEOUT_BACKUP):
# Fetch enough conversations to satisfy offset + limit
fetch_limit = backup_request.offset + backup_request.conversation_limit
conversations = await run_in_threadpool(reaexport_conversation_pdf function · python · L390-L477 (88 LOC)api/routers/export.py
async def export_conversation_pdf(
pdf_request: PDFExportRequest,
chat_id: str = Path(
...,
min_length=1,
max_length=255,
description="The unique conversation identifier",
examples=["chat123456789"],
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> PDFExportResponse:
"""Export a conversation to PDF format.
Generates a beautifully formatted PDF document with:
- Header with conversation name, participants, and date range
- Messages with styled bubbles (grayscale for printing)
- Inline image attachment thumbnails
- Message reactions
- Page numbers in footer
Args:
chat_id: The conversation ID to export.
request: Export options including attachments, reactions, and date range.
Returns:
PDFExportResponse with base64-encoded PDF data.
Raises:
HTTPException: If conversation not found or export fails.
"""
try:
# Get conversation metadatadownload_conversation_pdf function · python · L481-L561 (81 LOC)api/routers/export.py
async def download_conversation_pdf(
pdf_request: PDFExportRequest,
chat_id: str = Path(
...,
min_length=1,
max_length=255,
description="The unique conversation identifier",
examples=["chat123456789"],
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> Response:
"""Download a conversation as a PDF file.
Returns the PDF file directly for download instead of base64-encoded data.
Args:
chat_id: The conversation ID to export.
pdf_request: Export options.
Returns:
PDF file response for direct download.
"""
try:
# Get conversation metadata via direct lookup (avoids N+1)
conversation = await run_in_threadpool(reader.get_conversation, chat_id)
if conversation is None:
raise HTTPException(
status_code=404,
detail=f"Conversation not found: {chat_id}",
)
# Get messages with optional date filterecord_feedback function · python · L530-L599 (70 LOC)api/routers/feedback.py
def record_feedback(request: RecordFeedbackRequest) -> RecordFeedbackResponse:
"""Record feedback when a user interacts with a suggestion.
Use this endpoint to track:
- **sent**: User sent the suggestion unchanged (implicit positive signal)
- **edited**: User edited the suggestion before sending (capture for learning)
- **dismissed**: User dismissed the suggestion (implicit negative signal)
- **copied**: User copied the suggestion (partial positive signal)
The endpoint optionally computes evaluation scores for the suggestion
based on tone consistency, relevance, naturalness, and length appropriateness.
"""
# Validate action
try:
action = SuggestionAction(request.action)
except ValueError as e:
raise HTTPException(
status_code=400,
detail=f"Invalid action: {request.action}. "
"Must be one of: sent, edited, dismissed, copied",
) from e
# Validate edited text for edit actionsget_feedback_stats function · python · L609-L643 (35 LOC)api/routers/feedback.py
def get_feedback_stats() -> FeedbackStatsResponse:
"""Get aggregate feedback statistics.
Returns metrics including:
- Total feedback count by action type
- Acceptance rate (suggestions sent unchanged)
- Edit rate (suggestions modified before sending)
- Average evaluation scores across all feedback
Use these metrics to understand how well AI suggestions are meeting user needs.
"""
store = get_feedback_store()
stats = store.get_stats()
# Convert avg_evaluation_scores dict to proper format
avg_scores = stats.get("avg_evaluation_scores")
if avg_scores:
avg_scores = {
"tone_score": round(avg_scores["tone_score"], 3),
"relevance_score": round(avg_scores["relevance_score"], 3),
"naturalness_score": round(avg_scores["naturalness_score"], 3),
"length_score": round(avg_scores["length_score"], 3),
"overall_score": round(avg_scores["overall_score"], 3),
}
return Fget_improvements function · python · L653-L687 (35 LOC)api/routers/feedback.py
def get_improvements(
limit: int = Query(
default=10,
ge=1,
le=50,
description="Maximum number of improvement suggestions to return",
),
) -> ImprovementsResponse:
"""Get suggested improvements based on feedback patterns.
Analyzes patterns in user edits to identify areas for improvement:
- **Length**: Are users making responses shorter or longer?
- **Tone**: Are users adjusting formality levels?
- **Vocabulary**: What words do users frequently add or remove?
Use these insights to tune prompt engineering or model parameters.
"""
store = get_feedback_store()
improvements = store.get_improvements(limit=limit)
stats = store.get_stats()
improvement_list = [
ImprovementSuggestion(
type=imp["type"],
suggestion=imp["suggestion"],
detail=imp["detail"],
confidence=imp["confidence"],
)
for imp in improvements
]
return ImprovementsRget_recent_feedback function · python · L697-L738 (42 LOC)api/routers/feedback.py
def get_recent_feedback(
limit: int = Query(
default=50,
ge=1,
le=200,
description="Maximum number of entries to return",
),
) -> RecentFeedbackResponse:
"""Get recent feedback entries.
Returns the most recent feedback entries in reverse chronological order.
Useful for debugging and monitoring feedback patterns in real-time.
"""
store = get_feedback_store()
entries = store.get_recent_entries(limit=limit)
stats = store.get_stats()
def entry_to_response(entry: FeedbackEntry) -> FeedbackEntryResponse:
eval_scores = None
if entry.evaluation:
eval_scores = EvaluationScores(
tone_score=entry.evaluation.tone_score,
relevance_score=entry.evaluation.relevance_score,
naturalness_score=entry.evaluation.naturalness_score,
length_score=entry.evaluation.length_score,
overall_score=entry.evaluation.overall_score,
evaluate_response function · python · L748-L773 (26 LOC)api/routers/feedback.py
def evaluate_response(request: EvaluateResponseRequest) -> EvaluationScores:
"""Evaluate a response against quality metrics.
Computes scores for:
- **Tone**: How well the response matches the conversation's historical tone
- **Relevance**: Semantic similarity between response and recent context
- **Naturalness**: How natural the response sounds (perplexity-based)
- **Length**: How appropriate the length is compared to user's typical messages
Use this endpoint to preview evaluation scores before recording feedback,
or to evaluate responses from other sources.
"""
evaluator = get_response_evaluator()
result = evaluator.evaluate(
response=request.response,
context_messages=request.context_messages,
user_messages=request.user_messages,
)
return EvaluationScores(
tone_score=result.tone_score,
relevance_score=result.relevance_score,
naturalness_score=result.naturalness_score,
lengthRepobility — the code-quality scanner for AI-generated software · https://repobility.com
get_network_graph function · python · L67-L93 (27 LOC)api/routers/graph.py
def get_network_graph(
include_relationships: list[str] | None = Query(
default=None,
description="Filter by relationship types",
),
min_messages: int = Query(default=1, ge=0, description="Minimum messages"),
days_back: int | None = Query(default=None, ge=1, description="Days of history"),
max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
layout: Literal["force", "hierarchical", "radial"] = Query(
default="force",
description="Layout algorithm",
),
include_clusters: bool = Query(default=True, description="Run clustering"),
width: int = Query(default=800, ge=100, le=4000, description="Layout width"),
height: int = Query(default=600, ge=100, le=4000, description="Layout height"),
) -> GraphDataSchema:
"""Get the full relationship network graph."""
return build_network_graph(
include_relationships=include_relationships,
min_messages=min_messages,
days_back=daysget_ego_graph function · python · L105-L121 (17 LOC)api/routers/graph.py
def get_ego_graph(
contact_id: str,
depth: int = Query(default=1, ge=1, le=3, description="Hops from center"),
max_neighbors: int = Query(default=20, ge=1, le=100, description="Max neighbors"),
layout: Literal["force", "radial"] = Query(default="radial", description="Layout"),
width: int = Query(default=800, ge=100, le=4000, description="Layout width"),
height: int = Query(default=600, ge=100, le=4000, description="Layout height"),
) -> GraphDataSchema:
"""Get an ego-centric graph centered on a specific contact."""
return build_ego_graph(
contact_id=contact_id,
depth=depth,
max_neighbors=max_neighbors,
layout=layout,
width=width,
height=height,
)get_clusters function · python · L159-L169 (11 LOC)api/routers/graph.py
def get_clusters(
max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
resolution: float = Query(
default=1.0,
ge=0.1,
le=5.0,
description="Clustering resolution (higher = more clusters)",
),
) -> ClusterResultSchema:
"""Get community assignments for contacts."""
return compute_clusters(max_nodes=max_nodes, resolution=resolution)get_graph_evolution function · python · L181-L196 (16 LOC)api/routers/graph.py
def get_graph_evolution(
from_date: str = Query(description="Start date (YYYY-MM-DD)"),
to_date: str = Query(description="End date (YYYY-MM-DD)"),
interval: Literal["day", "week", "month"] = Query(
default="week",
description="Snapshot interval",
),
max_nodes: int = Query(default=50, ge=1, le=200, description="Max nodes per snapshot"),
) -> GraphEvolutionResponse:
"""Get graph evolution over a time period."""
return compute_graph_evolution(
from_date=from_date,
to_date=to_date,
interval=interval,
max_nodes=max_nodes,
)export_graph function · python · L207-L212 (6 LOC)api/routers/graph.py
def export_graph(
request: ExportGraphRequest,
max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
) -> ExportGraphResponse:
"""Export relationship graph in supported formats."""
return export_graph_data(request=request, max_nodes=max_nodes)get_system_status function · python · L54-L82 (29 LOC)api/routers/health_debug.py
async def get_system_status(request: Request) -> dict[str, Any]:
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
model_loaded = False
try:
from models import get_generator
gen = get_generator()
model_loaded = gen.is_loaded()
except (ImportError, AttributeError, RuntimeError) as e:
logging.getLogger(__name__).debug(f"Model loaded check failed: {e}")
embedding_available = False
try:
from models.bert_embedder import is_mlx_available
embedding_available = is_mlx_available()
except (ImportError, AttributeError, RuntimeError) as e:
logging.getLogger(__name__).debug(f"Embedding availability check failed: {e}")
return {
"memory_rss_mb": round(mem_info.rss / BYTES_PER_MB, 1),
"memory_vms_mb": round(mem_info.vms / BYTES_PER_MB, 1),
"memory_percent": round(process.memory_percent(), 1),
"cpu_percent": round(process.cpu_percent(), 1),
"model_get_memory_fast function · python · L26-L79 (54 LOC)api/routers/health_readiness.py
def _get_memory_fast() -> tuple[float, float, float]:
"""Get system memory using native macOS command (much faster than psutil)."""
try:
result = subprocess.run(
["vm_stat"],
capture_output=True,
text=True,
timeout=2,
)
if result.returncode != 0:
raise RuntimeError("vm_stat failed")
lines = result.stdout.strip().split("\n")
# Parse page size: "Mach Virtual Memory Statistics: (page size of 16384 bytes)"
page_size = 4096
if len(lines) > 0 and "page size of" in lines[0]:
match = re.search(r"page size of (\d+) bytes", lines[0])
if match:
page_size = int(match.group(1))
stats = {}
for line in lines:
if ":" in line:
key, value = line.split(":", 1)
try:
stats[key.strip()] = int(value.strip().rstrip("."))
except ValueError:
_get_process_memory function · python · L82-L89 (8 LOC)api/routers/health_readiness.py
def _get_process_memory() -> tuple[float, float]:
try:
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
return mem_info.rss / BYTES_PER_MB, mem_info.vms / BYTES_PER_MB
except (OSError, AttributeError) as e:
logger.error(f"Failed to fetch process memory: {e}")
return 0.0, 0.0Source: Repobility analyzer · https://repobility.com
_check_imessage_access function · python · L92-L102 (11 LOC)api/routers/health_readiness.py
def _check_imessage_access() -> bool:
try:
from integrations.imessage import ChatDBReader
reader = ChatDBReader()
result = reader.check_access()
reader.close()
return result
except (OSError, PermissionError) as e:
logger.error(f"iMessage access check failed: {e}")
return False_get_memory_mode function · python · L105-L110 (6 LOC)api/routers/health_readiness.py
def _get_memory_mode(available_gb: float) -> str:
if available_gb >= 4.0:
return "FULL"
if available_gb >= 2.0:
return "LITE"
return "MINIMAL"_check_model_loaded function · python · L113-L120 (8 LOC)api/routers/health_readiness.py
def _check_model_loaded() -> bool:
try:
from models import get_generator
generator = get_generator()
return generator.is_loaded()
except Exception:
return False_get_model_info function · python · L123-L154 (32 LOC)api/routers/health_readiness.py
def _get_model_info() -> ModelInfo | None:
cache = get_model_info_cache()
found, cached = cache.get("model_info")
if found:
return cached # type: ignore[no-any-return]
try:
from models import get_generator
generator = get_generator()
if generator is None:
return None
loader = getattr(generator, "_loader", None)
if loader is None:
return None
info = loader.get_current_model_info()
if info is None:
return None
result = ModelInfo(
id=info.get("id"),
display_name=info.get("display_name", "Unknown"),
loaded=info.get("loaded", False),
memory_usage_mb=info.get("memory_usage_mb", 0.0),
quality_tier=info.get("quality_tier"),
)
cache.set("model_info", result)
return result
except (ImportError, AttributeError, KeyError, TypeError, RuntimeError):
return None_get_recommended_model function · python · L157-L164 (8 LOC)api/routers/health_readiness.py
def _get_recommended_model(total_ram_gb: float) -> str | None:
try:
from models import get_recommended_model
spec = get_recommended_model(total_ram_gb)
return spec.id
except Exception:
return Noneget_health function · python · L169-L214 (46 LOC)api/routers/health_readiness.py
async def get_health(request: Request) -> HealthResponse:
cache = get_health_cache()
found, cached = cache.get("health_status")
if found:
return cached # type: ignore[no-any-return]
# Use fast native command instead of slow psutil
available_gb, used_gb, total_gb = _get_memory_fast()
jarvis_rss_mb, jarvis_vms_mb = _get_process_memory()
imessage_access = await run_in_threadpool(_check_imessage_access)
memory_mode = _get_memory_mode(available_gb)
model_loaded = _check_model_loaded()
model_info = _get_model_info()
recommended_model = _get_recommended_model(total_gb)
details: dict[str, str] = {}
if not imessage_access:
details["imessage"] = "Full Disk Access required"
if available_gb < 2.0:
details["memory"] = f"Low memory: {available_gb:.1f}GB available"
if not imessage_access:
status = "unhealthy"
elif available_gb < 2.0:
status = "degraded"
else:
status = "healthy"
get_diagnostic function · python · L225-L255 (31 LOC)api/routers/health_readiness.py
async def get_diagnostic(request: Request) -> dict[str, Any]:
checks: dict[str, Any] = {}
issues: list[str] = []
checks["memory"] = {
"status": "ok",
"available_gb": round(psutil.virtual_memory().available / BYTES_PER_GB, 2),
}
try:
from models import get_generator
gen = get_generator()
checks["model"] = {"status": "ok", "loaded": gen.is_loaded()}
except (ImportError, AttributeError, RuntimeError) as e:
checks["model"] = {"status": "error", "message": str(e)}
issues.append(f"Model check failed: {e}")
try:
from integrations.imessage import ChatDBReader
reader = ChatDBReader()
_ = len(reader.get_conversations(limit=1))
reader.close()
checks["database"] = {"status": "ok", "accessible": True}
except (OSError, PermissionError, RuntimeError) as e:
checks["database"] = {"status": "error", "message": str(e)}
issues.append(f"Database access failed:_format_prometheus_metric function · python · L30-L60 (31 LOC)api/routers/metrics.py
def _format_prometheus_metric(
name: str,
value: float | int,
labels: dict[str, str] | None = None,
metric_type: str = "gauge",
help_text: str = "",
) -> str:
"""Format a metric in Prometheus text format.
Args:
name: Metric name
value: Metric value
labels: Optional labels
metric_type: Type (gauge, counter, histogram)
help_text: Help text
Returns:
Prometheus-formatted metric string
"""
lines = []
if help_text:
lines.append(f"# HELP {name} {help_text}")
lines.append(f"# TYPE {name} {metric_type}")
if labels:
label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
lines.append(f"{name}{{{label_str}}} {value}")
else:
lines.append(f"{name} {value}")
return "\n".join(lines)Same scanner, your repo: https://repobility.com — Repobility
get_prometheus_metrics function · python · L65-L163 (99 LOC)api/routers/metrics.py
async def get_prometheus_metrics(request: Request) -> str:
"""Get all metrics in Prometheus text format.
Returns metrics compatible with Prometheus scraping:
- jarvis_memory_rss_bytes: Current RSS memory usage
- jarvis_memory_vms_bytes: Current VMS memory usage
- jarvis_memory_available_bytes: System available memory
- jarvis_requests_total: Total request count by endpoint
- jarvis_request_duration_seconds: Request latency histogram
"""
lines: list[str] = []
# Process memory metrics
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
system_mem = psutil.virtual_memory()
lines.append(
_format_prometheus_metric(
"jarvis_memory_rss_bytes",
mem_info.rss,
metric_type="gauge",
help_text="Resident Set Size in bytes",
)
)
lines.append(
_format_prometheus_metric(
"jarvis_memory_vms_bytes",
mem_info.vms,
get_memory_metrics function · python · L168-L220 (53 LOC)api/routers/metrics.py
async def get_memory_metrics(request: Request) -> dict[str, Any]:
"""Get detailed memory breakdown.
Returns:
Detailed memory statistics including:
- Current process memory (RSS, VMS)
- System memory (total, available, used)
- Memory sampling history stats
- Metal GPU memory (if available)
"""
process = psutil.Process(os.getpid())
mem_info = process.memory_info()
system_mem = psutil.virtual_memory()
sampler = get_memory_sampler()
sample_stats = sampler.get_stats()
# Try to get Metal memory if MLX is available
metal_mb = 0.0
try:
import mlx.core as mx
metal_mb = mx.metal.get_peak_memory() / BYTES_PER_MB
except (ImportError, AttributeError):
pass
# Get recent samples for trend data
recent_samples = sampler.get_samples()
trend_data = []
for sample in recent_samples[-60:]: # Last 60 samples
trend_data.append(
{
"timestamp":get_latency_metrics function · python · L225-L260 (36 LOC)api/routers/metrics.py
async def get_latency_metrics(request: Request) -> dict[str, Any]:
"""Get request latency percentiles.
Returns:
Latency statistics by operation including:
- Count of observations
- Mean, min, max latency
- p50, p90, p95, p99 percentiles
"""
histogram = get_latency_histogram()
all_stats = histogram.get_stats()
result: dict[str, Any] = {"operations": {}}
for operation, stats in all_stats.items():
percentiles = histogram.get_percentiles(operation)
result["operations"][operation] = {
"count": stats["count"],
"mean_ms": stats["mean_ms"],
"p50_ms": round(percentiles["p50"] * 1000, 3),
"p90_ms": round(percentiles["p90"] * 1000, 3),
"p95_ms": round(percentiles["p95"] * 1000, 3),
"p99_ms": round(percentiles["p99"] * 1000, 3),
}
# Add summary stats
counter = get_request_counter()
counter_stats = counter.get_stats()
result[get_request_metrics function · python · L265-L275 (11 LOC)api/routers/metrics.py
async def get_request_metrics(request: Request) -> dict[str, Any]:
"""Get request count metrics by endpoint.
Returns:
Request counts grouped by endpoint and method
"""
counter = get_request_counter()
return {
"endpoints": counter.get_all(),
"stats": counter.get_stats(),
}trigger_gc function · python · L280-L286 (7 LOC)api/routers/metrics.py
async def trigger_gc(request: Request) -> dict[str, Any]:
"""Trigger garbage collection and return memory delta.
Returns:
Memory usage before and after GC, plus objects collected
"""
return force_gc()take_memory_sample function · python · L291-L305 (15 LOC)api/routers/metrics.py
async def take_memory_sample(request: Request) -> dict[str, Any]:
"""Take an immediate memory sample.
Returns:
Current memory sample data
"""
sampler = get_memory_sampler()
sample = sampler.sample_now()
return {
"timestamp": sample.timestamp.isoformat(),
"rss_mb": round(sample.rss_mb, 2),
"vms_mb": round(sample.vms_mb, 2),
"percent": round(sample.percent, 2),
"available_gb": round(sample.available_gb, 2),
}reset_metrics function · python · L310-L322 (13 LOC)api/routers/metrics.py
async def reset_metrics(request: Request) -> dict[str, str]:
"""Reset all metrics counters (not memory sampler).
Returns:
Confirmation message
"""
counter = get_request_counter()
histogram = get_latency_histogram()
counter.reset()
histogram.reset()
return {"status": "ok", "message": "Metrics counters reset"}get_priority_inbox function · python · L367-L438 (72 LOC)api/routers/priority.py
def get_priority_inbox(
limit: Annotated[
int,
Query(
ge=1,
le=200,
description="Maximum number of messages to analyze",
),
] = 50,
include_handled: Annotated[
bool,
Query(
description="Whether to include already-handled messages",
),
] = False,
min_level: Annotated[
str | None,
Query(
description="Minimum priority level to include (critical, high, medium, low)",
),
] = None,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> PriorityInboxResponse:
"""Get prioritized messages sorted by importance.
Returns messages from recent conversations scored and sorted by priority.
Messages are analyzed for questions, action items, time-sensitive content,
and sender importance.
**Priority Levels:**
- **critical**: Score >= 0.8 - Urgent, needs immediate attention
- **high**: Score >= 0.6 - Important, shouProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
mark_handled function · python · L462-L489 (28 LOC)api/routers/priority.py
def mark_handled(request: MarkHandledRequest) -> MarkHandledResponse:
"""Mark a message as handled.
Use this when the user has dealt with a message (responded, noted, etc.)
to remove it from the priority inbox.
**Example Request:**
```json
{
"chat_id": "chat123456",
"message_id": 12345
}
```
Args:
request: MarkHandledRequest with chat_id and message_id
Returns:
MarkHandledResponse confirming the operation
"""
mark_handled_state(chat_id=request.chat_id, message_id=request.message_id, handled=True)
return MarkHandledResponse(
success=True,
chat_id=request.chat_id,
message_id=request.message_id,
handled=True,
)unmark_handled function · python · L513-L539 (27 LOC)api/routers/priority.py
def unmark_handled(request: MarkHandledRequest) -> MarkHandledResponse:
"""Unmark a message as handled.
Use this to restore a message to the priority inbox.
**Example Request:**
```json
{
"chat_id": "chat123456",
"message_id": 12345
}
```
Args:
request: MarkHandledRequest with chat_id and message_id
Returns:
MarkHandledResponse confirming the operation
"""
mark_handled_state(chat_id=request.chat_id, message_id=request.message_id, handled=False)
return MarkHandledResponse(
success=True,
chat_id=request.chat_id,
message_id=request.message_id,
handled=False,
)mark_contact_important function · python · L562-L588 (27 LOC)api/routers/priority.py
def mark_contact_important(request: ImportantContactRequest) -> ImportantContactResponse:
"""Mark or unmark a contact as important.
Important contacts receive higher priority scores for their messages.
Use this to ensure messages from key people are always prioritized.
**Example Request:**
```json
{
"identifier": "+1234567890",
"important": true
}
```
Args:
request: ImportantContactRequest with identifier and importance flag
Returns:
ImportantContactResponse confirming the operation
"""
mark_contact_importance(identifier=request.identifier, important=request.important)
return ImportantContactResponse(
success=True,
identifier=request.identifier,
important=request.important,
)