← back to jwalin-shah__jarvis-ai-assistant

Function bodies 1,000 total

All specs Real LLM only Function bodies
summarize_conversation function · python · L277-L361 (85 LOC)
api/routers/drafts.py
async def summarize_conversation(
    summary_request: DraftSummaryRequest,
    request: Request,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> DraftSummaryResponse:
    """Summarize a conversation using AI.

    Analyzes the specified number of messages from a conversation and generates
    a concise summary with key points. Uses the local MLX model for processing.

    **Rate Limiting:**
    This endpoint is rate limited to 10 requests per minute to prevent
    resource exhaustion from CPU-intensive model generation.

    **What You Get:**
    - A 1-2 sentence summary of the conversation
    - 2-4 key points extracted from the discussion
    - The date range of messages included in the summary

    **Use Cases:**
    - Catch up on a conversation you've missed
    - Get a quick overview of a long group chat
    - Find important decisions or action items

    **Example Request:**
    ```json
    {
        "chat_id": "chat123456789",
        "num_messages": 50
    }
    
generate_smart_reply function · python · L411-L446 (36 LOC)
api/routers/drafts.py
async def generate_smart_reply(
    routed_request: RoutedReplyRequest,
    request: Request,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> RoutedReplyResponse:
    """Generate a smart routed reply using the simplified router.

    Current behavior:
    - Non-empty messages: generated reply path.
    - Empty/invalid messages: clarify response.
    - Similarity and mobilization data influence prompt quality and confidence.

    Args:
        routed_request: RoutedReplyRequest with chat_id and optional last_message
        request: FastAPI request object (for rate limiting)

    Returns:
        RoutedReplyResponse with response, type, confidence, and metadata
    """
    messages = await fetch_messages(reader, routed_request.chat_id, routed_request.context_messages)
    ensure_messages_exist(routed_request.chat_id, messages)
    last_message, thread_context, context_info = build_smart_reply_input(
        messages=messages,
        requested_last_message=routed_request.l
_experiment_to_response function · python · L133-L143 (11 LOC)
api/routers/experiments.py
def _experiment_to_response(exp: Any) -> ExperimentResponse:
    """Convert Experiment to ExperimentResponse."""
    return ExperimentResponse(
        name=exp.name,
        enabled=exp.enabled,
        description=exp.description,
        created_at=exp.created_at,
        variants=[
            VariantConfigResponse(id=v.id, weight=v.weight, config=v.config) for v in exp.variants
        ],
    )
_results_to_response function · python · L146-L166 (21 LOC)
api/routers/experiments.py
def _results_to_response(results: ExperimentResults) -> ExperimentResultsResponse:
    """Convert ExperimentResults to ExperimentResultsResponse."""
    return ExperimentResultsResponse(
        experiment_name=results.experiment_name,
        variants=[
            VariantResultsResponse(
                variant_id=v.variant_id,
                total_impressions=v.total_impressions,
                sent_unchanged=v.sent_unchanged,
                sent_edited=v.sent_edited,
                dismissed=v.dismissed,
                regenerated=v.regenerated,
                conversion_rate=v.conversion_rate,
            )
            for v in results.variants
        ],
        total_outcomes=results.total_outcomes,
        winner=results.winner,
        is_significant=results.is_significant,
        p_value=results.p_value,
    )
list_experiments function · python · L187-L209 (23 LOC)
api/routers/experiments.py
def list_experiments(active_only: bool = False) -> ExperimentListResponse:
    """List all configured experiments.

    Returns a list of all experiments with their configurations.
    Optionally filter to show only active (enabled) experiments.

    Args:
        active_only: If true, only return enabled experiments.

    Returns:
        ExperimentListResponse with list of experiments.
    """
    manager = _get_manager()

    if active_only:
        experiments = manager.get_active_experiments()
    else:
        experiments = manager.get_experiments()

    return ExperimentListResponse(
        experiments=[_experiment_to_response(exp) for exp in experiments],
        total=len(experiments),
    )
get_experiment function · python · L221-L242 (22 LOC)
api/routers/experiments.py
def get_experiment(name: str) -> ExperimentResponse:
    """Get details of a specific experiment.

    Args:
        name: The experiment name.

    Returns:
        ExperimentResponse with experiment details.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    return _experiment_to_response(experiment)
get_experiment_results function · python · L254-L278 (25 LOC)
api/routers/experiments.py
def get_experiment_results(name: str) -> ExperimentResultsResponse:
    """Get results and statistics for an experiment.

    Returns win rates per variant, statistical significance indicators,
    and p-value from chi-squared test (for two-variant experiments).

    Args:
        name: The experiment name.

    Returns:
        ExperimentResultsResponse with results and statistics.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    results = manager.calculate_results(name)

    if not results:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    return _results_to_response(results)
Same scanner, your repo: https://repobility.com — Repobility
record_outcome function · python · L291-L345 (55 LOC)
api/routers/experiments.py
def record_outcome(name: str, request: RecordOutcomeRequest) -> RecordOutcomeResponse:
    """Record an outcome for an experiment.

    Records the user's action (sent_unchanged, sent_edited, dismissed, regenerated)
    for tracking conversion rates and calculating statistical significance.

    Args:
        name: The experiment name.
        request: RecordOutcomeRequest with variant_id, contact_id, and action.

    Returns:
        RecordOutcomeResponse confirming the recorded outcome.

    Raises:
        HTTPException 400: Invalid action.
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    # Validate action
    try:
        action = UserAction(request.action)
    except ValueError:
        valid_actions = [a.value for a in UserAction]
        raise HTTPExc
create_experiment function · python · L359-L442 (84 LOC)
api/routers/experiments.py
def create_experiment(request: CreateExperimentRequest) -> ExperimentResponse:
    """Create a new A/B testing experiment.

    Each experiment needs at least two variants with allocation weights
    that should sum to 100.

    **Example Request:**
    ```json
    {
        "name": "reply_tone_test",
        "description": "Test casual vs professional reply tones",
        "enabled": true,
        "variants": [
            {
                "id": "control",
                "weight": 50,
                "config": {
                    "system_prompt": "You are a helpful assistant.",
                    "temperature": 0.7
                }
            },
            {
                "id": "treatment",
                "weight": 50,
                "config": {
                    "system_prompt": "You are a friendly, casual assistant.",
                    "temperature": 0.8
                }
            }
        ]
    }
    ```

    Args:
        request: CreateExperimentRequest with e
update_experiment function · python · L454-L487 (34 LOC)
api/routers/experiments.py
def update_experiment(name: str, request: UpdateExperimentRequest) -> ExperimentResponse:
    """Update an existing experiment.

    Currently supports enabling/disabling experiments.

    Args:
        name: The experiment name.
        request: UpdateExperimentRequest with new values.

    Returns:
        ExperimentResponse with updated experiment.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    success = manager.update_experiment(name, enabled=request.enabled)
    if not success:
        raise HTTPException(
            status_code=500,
            detail="Failed to update experiment",
        )

    # Reload to get updated state
    experiment = manager.get_experiment(name)
    return _experiment_to_response(experiment)
delete_experiment function · python · L498-L529 (32 LOC)
api/routers/experiments.py
def delete_experiment(name: str) -> dict[str, Any]:
    """Delete an experiment.

    Warning: This will permanently delete the experiment configuration.
    Recorded outcomes are not deleted.

    Args:
        name: The experiment name.

    Returns:
        Confirmation message.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    success = manager.delete_experiment(name)
    if not success:
        raise HTTPException(
            status_code=500,
            detail="Failed to delete experiment",
        )

    return {"status": "deleted", "experiment_name": name}
clear_experiment_outcomes function · python · L540-L570 (31 LOC)
api/routers/experiments.py
def clear_experiment_outcomes(name: str) -> dict[str, Any]:
    """Clear all recorded outcomes for an experiment.

    Use this to reset experiment data when starting a new test cycle.

    Args:
        name: The experiment name.

    Returns:
        Confirmation message.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    success = manager.clear_outcomes(name)
    if not success:
        raise HTTPException(
            status_code=500,
            detail="Failed to clear outcomes",
        )

    return {"status": "cleared", "experiment_name": name}
get_variant_for_contact function · python · L582-L611 (30 LOC)
api/routers/experiments.py
def get_variant_for_contact(name: str, contact_id: str) -> VariantConfigResponse | None:
    """Get the variant assignment for a specific contact.

    Uses consistent hashing to ensure the same contact always gets
    the same variant for a given experiment.

    Args:
        name: The experiment name.
        contact_id: Identifier for the contact (phone, email, etc.).

    Returns:
        VariantConfigResponse with the assigned variant, or None if disabled.

    Raises:
        HTTPException 404: Experiment not found.
    """
    manager = _get_manager()
    experiment = manager.get_experiment(name)

    if not experiment:
        raise HTTPException(
            status_code=404,
            detail=f"Experiment not found: {name}",
        )

    variant = manager.get_variant(name, contact_id)
    if not variant:
        return None

    return VariantConfigResponse(id=variant.id, weight=variant.weight, config=variant.config)
export_conversation function · python · L94-L190 (97 LOC)
api/routers/export.py
async def export_conversation(
    request: Request,
    export_request: ExportConversationRequest,
    chat_id: str = Path(
        ...,
        min_length=1,
        max_length=255,
        description="The unique conversation identifier",
        examples=["chat123456789"],
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
    """Export a single conversation.

    Exports all messages from a conversation in the specified format.
    Supports JSON (full data), CSV (flattened), and TXT (human-readable).

    **Rate Limiting:**
    This endpoint is rate limited to 60 requests per minute.

    Args:
        chat_id: The conversation ID to export.
        request: Export options including format and date range.
        http_request: FastAPI request object (for rate limiting)

    Returns:
        ExportResponse with exported data.

    Raises:
        HTTPException 408: Request timed out
        HTTPException 429: Rate limit exceeded
    """
    try:
  
export_search function · python · L195-L273 (79 LOC)
api/routers/export.py
async def export_search(
    search_request: ExportSearchRequest,
    request: Request,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
    """Export search results.

    Searches messages and exports the results in the specified format.

    **Rate Limiting:**
    This endpoint is rate limited to 60 requests per minute.

    Args:
        request: Search query and export options.
        request: FastAPI request object (for rate limiting)

    Returns:
        ExportResponse with exported search results.

    Raises:
        HTTPException 408: Request timed out
        HTTPException 429: Rate limit exceeded
    """
    try:
        async with asyncio.timeout(get_timeout_read()):
            # Perform search
            after = search_request.date_range.start if search_request.date_range else None
            before = search_request.date_range.end if search_request.date_range else None

            messages = await run_in_threadpool(
                reader
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
export_full_backup function · python · L281-L381 (101 LOC)
api/routers/export.py
async def export_full_backup(
    backup_request: ExportBackupRequest,
    request: Request,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> ExportResponse:
    """Create a full backup of accessible conversations.

    Exports multiple conversations with their messages in JSON format.
    Only JSON format is supported for full backups.

    **Rate Limiting:**
    This endpoint is rate limited to 60 requests per minute.

    Args:
        backup_request: Backup options including limits.
        request: FastAPI request object (for rate limiting)

    Returns:
        ExportResponse with backup data.

    Raises:
        HTTPException 408: Request timed out
        HTTPException 429: Rate limit exceeded
    """
    try:
        async with asyncio.timeout(TIMEOUT_BACKUP):
            # Fetch enough conversations to satisfy offset + limit
            fetch_limit = backup_request.offset + backup_request.conversation_limit
            conversations = await run_in_threadpool(rea
export_conversation_pdf function · python · L390-L477 (88 LOC)
api/routers/export.py
async def export_conversation_pdf(
    pdf_request: PDFExportRequest,
    chat_id: str = Path(
        ...,
        min_length=1,
        max_length=255,
        description="The unique conversation identifier",
        examples=["chat123456789"],
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> PDFExportResponse:
    """Export a conversation to PDF format.

    Generates a beautifully formatted PDF document with:
    - Header with conversation name, participants, and date range
    - Messages with styled bubbles (grayscale for printing)
    - Inline image attachment thumbnails
    - Message reactions
    - Page numbers in footer

    Args:
        chat_id: The conversation ID to export.
        request: Export options including attachments, reactions, and date range.

    Returns:
        PDFExportResponse with base64-encoded PDF data.

    Raises:
        HTTPException: If conversation not found or export fails.
    """
    try:
        # Get conversation metadata
download_conversation_pdf function · python · L481-L561 (81 LOC)
api/routers/export.py
async def download_conversation_pdf(
    pdf_request: PDFExportRequest,
    chat_id: str = Path(
        ...,
        min_length=1,
        max_length=255,
        description="The unique conversation identifier",
        examples=["chat123456789"],
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> Response:
    """Download a conversation as a PDF file.

    Returns the PDF file directly for download instead of base64-encoded data.

    Args:
        chat_id: The conversation ID to export.
        pdf_request: Export options.

    Returns:
        PDF file response for direct download.
    """
    try:
        # Get conversation metadata via direct lookup (avoids N+1)
        conversation = await run_in_threadpool(reader.get_conversation, chat_id)

        if conversation is None:
            raise HTTPException(
                status_code=404,
                detail=f"Conversation not found: {chat_id}",
            )

        # Get messages with optional date filte
record_feedback function · python · L530-L599 (70 LOC)
api/routers/feedback.py
def record_feedback(request: RecordFeedbackRequest) -> RecordFeedbackResponse:
    """Record feedback when a user interacts with a suggestion.

    Use this endpoint to track:
    - **sent**: User sent the suggestion unchanged (implicit positive signal)
    - **edited**: User edited the suggestion before sending (capture for learning)
    - **dismissed**: User dismissed the suggestion (implicit negative signal)
    - **copied**: User copied the suggestion (partial positive signal)

    The endpoint optionally computes evaluation scores for the suggestion
    based on tone consistency, relevance, naturalness, and length appropriateness.
    """
    # Validate action
    try:
        action = SuggestionAction(request.action)
    except ValueError as e:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid action: {request.action}. "
            "Must be one of: sent, edited, dismissed, copied",
        ) from e

    # Validate edited text for edit actions
get_feedback_stats function · python · L609-L643 (35 LOC)
api/routers/feedback.py
def get_feedback_stats() -> FeedbackStatsResponse:
    """Get aggregate feedback statistics.

    Returns metrics including:
    - Total feedback count by action type
    - Acceptance rate (suggestions sent unchanged)
    - Edit rate (suggestions modified before sending)
    - Average evaluation scores across all feedback

    Use these metrics to understand how well AI suggestions are meeting user needs.
    """
    store = get_feedback_store()
    stats = store.get_stats()

    # Convert avg_evaluation_scores dict to proper format
    avg_scores = stats.get("avg_evaluation_scores")
    if avg_scores:
        avg_scores = {
            "tone_score": round(avg_scores["tone_score"], 3),
            "relevance_score": round(avg_scores["relevance_score"], 3),
            "naturalness_score": round(avg_scores["naturalness_score"], 3),
            "length_score": round(avg_scores["length_score"], 3),
            "overall_score": round(avg_scores["overall_score"], 3),
        }

    return F
get_improvements function · python · L653-L687 (35 LOC)
api/routers/feedback.py
def get_improvements(
    limit: int = Query(
        default=10,
        ge=1,
        le=50,
        description="Maximum number of improvement suggestions to return",
    ),
) -> ImprovementsResponse:
    """Get suggested improvements based on feedback patterns.

    Analyzes patterns in user edits to identify areas for improvement:
    - **Length**: Are users making responses shorter or longer?
    - **Tone**: Are users adjusting formality levels?
    - **Vocabulary**: What words do users frequently add or remove?

    Use these insights to tune prompt engineering or model parameters.
    """
    store = get_feedback_store()
    improvements = store.get_improvements(limit=limit)
    stats = store.get_stats()

    improvement_list = [
        ImprovementSuggestion(
            type=imp["type"],
            suggestion=imp["suggestion"],
            detail=imp["detail"],
            confidence=imp["confidence"],
        )
        for imp in improvements
    ]

    return ImprovementsR
get_recent_feedback function · python · L697-L738 (42 LOC)
api/routers/feedback.py
def get_recent_feedback(
    limit: int = Query(
        default=50,
        ge=1,
        le=200,
        description="Maximum number of entries to return",
    ),
) -> RecentFeedbackResponse:
    """Get recent feedback entries.

    Returns the most recent feedback entries in reverse chronological order.
    Useful for debugging and monitoring feedback patterns in real-time.
    """
    store = get_feedback_store()
    entries = store.get_recent_entries(limit=limit)
    stats = store.get_stats()

    def entry_to_response(entry: FeedbackEntry) -> FeedbackEntryResponse:
        eval_scores = None
        if entry.evaluation:
            eval_scores = EvaluationScores(
                tone_score=entry.evaluation.tone_score,
                relevance_score=entry.evaluation.relevance_score,
                naturalness_score=entry.evaluation.naturalness_score,
                length_score=entry.evaluation.length_score,
                overall_score=entry.evaluation.overall_score,
        
evaluate_response function · python · L748-L773 (26 LOC)
api/routers/feedback.py
def evaluate_response(request: EvaluateResponseRequest) -> EvaluationScores:
    """Evaluate a response against quality metrics.

    Computes scores for:
    - **Tone**: How well the response matches the conversation's historical tone
    - **Relevance**: Semantic similarity between response and recent context
    - **Naturalness**: How natural the response sounds (perplexity-based)
    - **Length**: How appropriate the length is compared to user's typical messages

    Use this endpoint to preview evaluation scores before recording feedback,
    or to evaluate responses from other sources.
    """
    evaluator = get_response_evaluator()
    result = evaluator.evaluate(
        response=request.response,
        context_messages=request.context_messages,
        user_messages=request.user_messages,
    )

    return EvaluationScores(
        tone_score=result.tone_score,
        relevance_score=result.relevance_score,
        naturalness_score=result.naturalness_score,
        length
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
get_network_graph function · python · L67-L93 (27 LOC)
api/routers/graph.py
def get_network_graph(
    include_relationships: list[str] | None = Query(
        default=None,
        description="Filter by relationship types",
    ),
    min_messages: int = Query(default=1, ge=0, description="Minimum messages"),
    days_back: int | None = Query(default=None, ge=1, description="Days of history"),
    max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
    layout: Literal["force", "hierarchical", "radial"] = Query(
        default="force",
        description="Layout algorithm",
    ),
    include_clusters: bool = Query(default=True, description="Run clustering"),
    width: int = Query(default=800, ge=100, le=4000, description="Layout width"),
    height: int = Query(default=600, ge=100, le=4000, description="Layout height"),
) -> GraphDataSchema:
    """Get the full relationship network graph."""
    return build_network_graph(
        include_relationships=include_relationships,
        min_messages=min_messages,
        days_back=days
get_ego_graph function · python · L105-L121 (17 LOC)
api/routers/graph.py
def get_ego_graph(
    contact_id: str,
    depth: int = Query(default=1, ge=1, le=3, description="Hops from center"),
    max_neighbors: int = Query(default=20, ge=1, le=100, description="Max neighbors"),
    layout: Literal["force", "radial"] = Query(default="radial", description="Layout"),
    width: int = Query(default=800, ge=100, le=4000, description="Layout width"),
    height: int = Query(default=600, ge=100, le=4000, description="Layout height"),
) -> GraphDataSchema:
    """Get an ego-centric graph centered on a specific contact."""
    return build_ego_graph(
        contact_id=contact_id,
        depth=depth,
        max_neighbors=max_neighbors,
        layout=layout,
        width=width,
        height=height,
    )
get_clusters function · python · L159-L169 (11 LOC)
api/routers/graph.py
def get_clusters(
    max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
    resolution: float = Query(
        default=1.0,
        ge=0.1,
        le=5.0,
        description="Clustering resolution (higher = more clusters)",
    ),
) -> ClusterResultSchema:
    """Get community assignments for contacts."""
    return compute_clusters(max_nodes=max_nodes, resolution=resolution)
get_graph_evolution function · python · L181-L196 (16 LOC)
api/routers/graph.py
def get_graph_evolution(
    from_date: str = Query(description="Start date (YYYY-MM-DD)"),
    to_date: str = Query(description="End date (YYYY-MM-DD)"),
    interval: Literal["day", "week", "month"] = Query(
        default="week",
        description="Snapshot interval",
    ),
    max_nodes: int = Query(default=50, ge=1, le=200, description="Max nodes per snapshot"),
) -> GraphEvolutionResponse:
    """Get graph evolution over a time period."""
    return compute_graph_evolution(
        from_date=from_date,
        to_date=to_date,
        interval=interval,
        max_nodes=max_nodes,
    )
export_graph function · python · L207-L212 (6 LOC)
api/routers/graph.py
def export_graph(
    request: ExportGraphRequest,
    max_nodes: int = Query(default=100, ge=1, le=500, description="Max nodes"),
) -> ExportGraphResponse:
    """Export relationship graph in supported formats."""
    return export_graph_data(request=request, max_nodes=max_nodes)
get_system_status function · python · L54-L82 (29 LOC)
api/routers/health_debug.py
async def get_system_status(request: Request) -> dict[str, Any]:
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()

    model_loaded = False
    try:
        from models import get_generator

        gen = get_generator()
        model_loaded = gen.is_loaded()
    except (ImportError, AttributeError, RuntimeError) as e:
        logging.getLogger(__name__).debug(f"Model loaded check failed: {e}")

    embedding_available = False
    try:
        from models.bert_embedder import is_mlx_available

        embedding_available = is_mlx_available()
    except (ImportError, AttributeError, RuntimeError) as e:
        logging.getLogger(__name__).debug(f"Embedding availability check failed: {e}")

    return {
        "memory_rss_mb": round(mem_info.rss / BYTES_PER_MB, 1),
        "memory_vms_mb": round(mem_info.vms / BYTES_PER_MB, 1),
        "memory_percent": round(process.memory_percent(), 1),
        "cpu_percent": round(process.cpu_percent(), 1),
        "model
_get_memory_fast function · python · L26-L79 (54 LOC)
api/routers/health_readiness.py
def _get_memory_fast() -> tuple[float, float, float]:
    """Get system memory using native macOS command (much faster than psutil)."""
    try:
        result = subprocess.run(
            ["vm_stat"],
            capture_output=True,
            text=True,
            timeout=2,
        )
        if result.returncode != 0:
            raise RuntimeError("vm_stat failed")

        lines = result.stdout.strip().split("\n")

        # Parse page size: "Mach Virtual Memory Statistics: (page size of 16384 bytes)"
        page_size = 4096
        if len(lines) > 0 and "page size of" in lines[0]:
            match = re.search(r"page size of (\d+) bytes", lines[0])
            if match:
                page_size = int(match.group(1))

        stats = {}
        for line in lines:
            if ":" in line:
                key, value = line.split(":", 1)
                try:
                    stats[key.strip()] = int(value.strip().rstrip("."))
                except ValueError:
           
_get_process_memory function · python · L82-L89 (8 LOC)
api/routers/health_readiness.py
def _get_process_memory() -> tuple[float, float]:
    try:
        process = psutil.Process(os.getpid())
        mem_info = process.memory_info()
        return mem_info.rss / BYTES_PER_MB, mem_info.vms / BYTES_PER_MB
    except (OSError, AttributeError) as e:
        logger.error(f"Failed to fetch process memory: {e}")
        return 0.0, 0.0
Source: Repobility analyzer · https://repobility.com
_check_imessage_access function · python · L92-L102 (11 LOC)
api/routers/health_readiness.py
def _check_imessage_access() -> bool:
    try:
        from integrations.imessage import ChatDBReader

        reader = ChatDBReader()
        result = reader.check_access()
        reader.close()
        return result
    except (OSError, PermissionError) as e:
        logger.error(f"iMessage access check failed: {e}")
        return False
_get_memory_mode function · python · L105-L110 (6 LOC)
api/routers/health_readiness.py
def _get_memory_mode(available_gb: float) -> str:
    if available_gb >= 4.0:
        return "FULL"
    if available_gb >= 2.0:
        return "LITE"
    return "MINIMAL"
_check_model_loaded function · python · L113-L120 (8 LOC)
api/routers/health_readiness.py
def _check_model_loaded() -> bool:
    try:
        from models import get_generator

        generator = get_generator()
        return generator.is_loaded()
    except Exception:
        return False
_get_model_info function · python · L123-L154 (32 LOC)
api/routers/health_readiness.py
def _get_model_info() -> ModelInfo | None:
    cache = get_model_info_cache()
    found, cached = cache.get("model_info")
    if found:
        return cached  # type: ignore[no-any-return]

    try:
        from models import get_generator

        generator = get_generator()
        if generator is None:
            return None

        loader = getattr(generator, "_loader", None)
        if loader is None:
            return None

        info = loader.get_current_model_info()
        if info is None:
            return None

        result = ModelInfo(
            id=info.get("id"),
            display_name=info.get("display_name", "Unknown"),
            loaded=info.get("loaded", False),
            memory_usage_mb=info.get("memory_usage_mb", 0.0),
            quality_tier=info.get("quality_tier"),
        )
        cache.set("model_info", result)
        return result
    except (ImportError, AttributeError, KeyError, TypeError, RuntimeError):
        return None
_get_recommended_model function · python · L157-L164 (8 LOC)
api/routers/health_readiness.py
def _get_recommended_model(total_ram_gb: float) -> str | None:
    try:
        from models import get_recommended_model

        spec = get_recommended_model(total_ram_gb)
        return spec.id
    except Exception:
        return None
get_health function · python · L169-L214 (46 LOC)
api/routers/health_readiness.py
async def get_health(request: Request) -> HealthResponse:
    cache = get_health_cache()
    found, cached = cache.get("health_status")
    if found:
        return cached  # type: ignore[no-any-return]

    # Use fast native command instead of slow psutil
    available_gb, used_gb, total_gb = _get_memory_fast()

    jarvis_rss_mb, jarvis_vms_mb = _get_process_memory()
    imessage_access = await run_in_threadpool(_check_imessage_access)
    memory_mode = _get_memory_mode(available_gb)
    model_loaded = _check_model_loaded()
    model_info = _get_model_info()
    recommended_model = _get_recommended_model(total_gb)

    details: dict[str, str] = {}
    if not imessage_access:
        details["imessage"] = "Full Disk Access required"
    if available_gb < 2.0:
        details["memory"] = f"Low memory: {available_gb:.1f}GB available"

    if not imessage_access:
        status = "unhealthy"
    elif available_gb < 2.0:
        status = "degraded"
    else:
        status = "healthy"

  
get_diagnostic function · python · L225-L255 (31 LOC)
api/routers/health_readiness.py
async def get_diagnostic(request: Request) -> dict[str, Any]:
    checks: dict[str, Any] = {}
    issues: list[str] = []

    checks["memory"] = {
        "status": "ok",
        "available_gb": round(psutil.virtual_memory().available / BYTES_PER_GB, 2),
    }

    try:
        from models import get_generator

        gen = get_generator()
        checks["model"] = {"status": "ok", "loaded": gen.is_loaded()}
    except (ImportError, AttributeError, RuntimeError) as e:
        checks["model"] = {"status": "error", "message": str(e)}
        issues.append(f"Model check failed: {e}")

    try:
        from integrations.imessage import ChatDBReader

        reader = ChatDBReader()
        _ = len(reader.get_conversations(limit=1))
        reader.close()
        checks["database"] = {"status": "ok", "accessible": True}
    except (OSError, PermissionError, RuntimeError) as e:
        checks["database"] = {"status": "error", "message": str(e)}
        issues.append(f"Database access failed:
_format_prometheus_metric function · python · L30-L60 (31 LOC)
api/routers/metrics.py
def _format_prometheus_metric(
    name: str,
    value: float | int,
    labels: dict[str, str] | None = None,
    metric_type: str = "gauge",
    help_text: str = "",
) -> str:
    """Format a metric in Prometheus text format.

    Args:
        name: Metric name
        value: Metric value
        labels: Optional labels
        metric_type: Type (gauge, counter, histogram)
        help_text: Help text

    Returns:
        Prometheus-formatted metric string
    """
    lines = []
    if help_text:
        lines.append(f"# HELP {name} {help_text}")
    lines.append(f"# TYPE {name} {metric_type}")

    if labels:
        label_str = ",".join(f'{k}="{v}"' for k, v in labels.items())
        lines.append(f"{name}{{{label_str}}} {value}")
    else:
        lines.append(f"{name} {value}")

    return "\n".join(lines)
Same scanner, your repo: https://repobility.com — Repobility
get_prometheus_metrics function · python · L65-L163 (99 LOC)
api/routers/metrics.py
async def get_prometheus_metrics(request: Request) -> str:
    """Get all metrics in Prometheus text format.

    Returns metrics compatible with Prometheus scraping:
    - jarvis_memory_rss_bytes: Current RSS memory usage
    - jarvis_memory_vms_bytes: Current VMS memory usage
    - jarvis_memory_available_bytes: System available memory
    - jarvis_requests_total: Total request count by endpoint
    - jarvis_request_duration_seconds: Request latency histogram
    """
    lines: list[str] = []

    # Process memory metrics
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    system_mem = psutil.virtual_memory()

    lines.append(
        _format_prometheus_metric(
            "jarvis_memory_rss_bytes",
            mem_info.rss,
            metric_type="gauge",
            help_text="Resident Set Size in bytes",
        )
    )
    lines.append(
        _format_prometheus_metric(
            "jarvis_memory_vms_bytes",
            mem_info.vms,
            
get_memory_metrics function · python · L168-L220 (53 LOC)
api/routers/metrics.py
async def get_memory_metrics(request: Request) -> dict[str, Any]:
    """Get detailed memory breakdown.

    Returns:
        Detailed memory statistics including:
        - Current process memory (RSS, VMS)
        - System memory (total, available, used)
        - Memory sampling history stats
        - Metal GPU memory (if available)
    """
    process = psutil.Process(os.getpid())
    mem_info = process.memory_info()
    system_mem = psutil.virtual_memory()

    sampler = get_memory_sampler()
    sample_stats = sampler.get_stats()

    # Try to get Metal memory if MLX is available
    metal_mb = 0.0
    try:
        import mlx.core as mx

        metal_mb = mx.metal.get_peak_memory() / BYTES_PER_MB
    except (ImportError, AttributeError):
        pass

    # Get recent samples for trend data
    recent_samples = sampler.get_samples()
    trend_data = []
    for sample in recent_samples[-60:]:  # Last 60 samples
        trend_data.append(
            {
                "timestamp":
get_latency_metrics function · python · L225-L260 (36 LOC)
api/routers/metrics.py
async def get_latency_metrics(request: Request) -> dict[str, Any]:
    """Get request latency percentiles.

    Returns:
        Latency statistics by operation including:
        - Count of observations
        - Mean, min, max latency
        - p50, p90, p95, p99 percentiles
    """
    histogram = get_latency_histogram()
    all_stats = histogram.get_stats()

    result: dict[str, Any] = {"operations": {}}

    for operation, stats in all_stats.items():
        percentiles = histogram.get_percentiles(operation)
        result["operations"][operation] = {
            "count": stats["count"],
            "mean_ms": stats["mean_ms"],
            "p50_ms": round(percentiles["p50"] * 1000, 3),
            "p90_ms": round(percentiles["p90"] * 1000, 3),
            "p95_ms": round(percentiles["p95"] * 1000, 3),
            "p99_ms": round(percentiles["p99"] * 1000, 3),
        }

    # Add summary stats
    counter = get_request_counter()
    counter_stats = counter.get_stats()
    result[
get_request_metrics function · python · L265-L275 (11 LOC)
api/routers/metrics.py
async def get_request_metrics(request: Request) -> dict[str, Any]:
    """Get request count metrics by endpoint.

    Returns:
        Request counts grouped by endpoint and method
    """
    counter = get_request_counter()
    return {
        "endpoints": counter.get_all(),
        "stats": counter.get_stats(),
    }
trigger_gc function · python · L280-L286 (7 LOC)
api/routers/metrics.py
async def trigger_gc(request: Request) -> dict[str, Any]:
    """Trigger garbage collection and return memory delta.

    Returns:
        Memory usage before and after GC, plus objects collected
    """
    return force_gc()
take_memory_sample function · python · L291-L305 (15 LOC)
api/routers/metrics.py
async def take_memory_sample(request: Request) -> dict[str, Any]:
    """Take an immediate memory sample.

    Returns:
        Current memory sample data
    """
    sampler = get_memory_sampler()
    sample = sampler.sample_now()
    return {
        "timestamp": sample.timestamp.isoformat(),
        "rss_mb": round(sample.rss_mb, 2),
        "vms_mb": round(sample.vms_mb, 2),
        "percent": round(sample.percent, 2),
        "available_gb": round(sample.available_gb, 2),
    }
reset_metrics function · python · L310-L322 (13 LOC)
api/routers/metrics.py
async def reset_metrics(request: Request) -> dict[str, str]:
    """Reset all metrics counters (not memory sampler).

    Returns:
        Confirmation message
    """
    counter = get_request_counter()
    histogram = get_latency_histogram()

    counter.reset()
    histogram.reset()

    return {"status": "ok", "message": "Metrics counters reset"}
get_priority_inbox function · python · L367-L438 (72 LOC)
api/routers/priority.py
def get_priority_inbox(
    limit: Annotated[
        int,
        Query(
            ge=1,
            le=200,
            description="Maximum number of messages to analyze",
        ),
    ] = 50,
    include_handled: Annotated[
        bool,
        Query(
            description="Whether to include already-handled messages",
        ),
    ] = False,
    min_level: Annotated[
        str | None,
        Query(
            description="Minimum priority level to include (critical, high, medium, low)",
        ),
    ] = None,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> PriorityInboxResponse:
    """Get prioritized messages sorted by importance.

    Returns messages from recent conversations scored and sorted by priority.
    Messages are analyzed for questions, action items, time-sensitive content,
    and sender importance.

    **Priority Levels:**
    - **critical**: Score >= 0.8 - Urgent, needs immediate attention
    - **high**: Score >= 0.6 - Important, shou
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
mark_handled function · python · L462-L489 (28 LOC)
api/routers/priority.py
def mark_handled(request: MarkHandledRequest) -> MarkHandledResponse:
    """Mark a message as handled.

    Use this when the user has dealt with a message (responded, noted, etc.)
    to remove it from the priority inbox.

    **Example Request:**
    ```json
    {
        "chat_id": "chat123456",
        "message_id": 12345
    }
    ```

    Args:
        request: MarkHandledRequest with chat_id and message_id

    Returns:
        MarkHandledResponse confirming the operation
    """
    mark_handled_state(chat_id=request.chat_id, message_id=request.message_id, handled=True)

    return MarkHandledResponse(
        success=True,
        chat_id=request.chat_id,
        message_id=request.message_id,
        handled=True,
    )
unmark_handled function · python · L513-L539 (27 LOC)
api/routers/priority.py
def unmark_handled(request: MarkHandledRequest) -> MarkHandledResponse:
    """Unmark a message as handled.

    Use this to restore a message to the priority inbox.

    **Example Request:**
    ```json
    {
        "chat_id": "chat123456",
        "message_id": 12345
    }
    ```

    Args:
        request: MarkHandledRequest with chat_id and message_id

    Returns:
        MarkHandledResponse confirming the operation
    """
    mark_handled_state(chat_id=request.chat_id, message_id=request.message_id, handled=False)

    return MarkHandledResponse(
        success=True,
        chat_id=request.chat_id,
        message_id=request.message_id,
        handled=False,
    )
mark_contact_important function · python · L562-L588 (27 LOC)
api/routers/priority.py
def mark_contact_important(request: ImportantContactRequest) -> ImportantContactResponse:
    """Mark or unmark a contact as important.

    Important contacts receive higher priority scores for their messages.
    Use this to ensure messages from key people are always prioritized.

    **Example Request:**
    ```json
    {
        "identifier": "+1234567890",
        "important": true
    }
    ```

    Args:
        request: ImportantContactRequest with identifier and importance flag

    Returns:
        ImportantContactResponse confirming the operation
    """
    mark_contact_importance(identifier=request.identifier, important=request.important)

    return ImportantContactResponse(
        success=True,
        identifier=request.identifier,
        important=request.important,
    )
‹ prevpage 3 / 20next ›