← back to jwalin-shah__jarvis-ai-assistant

Function bodies 1,000 total

All specs Real LLM only Function bodies
get_or_create_ttl_cache function · python · L14-L46 (33 LOC)
api/cache_utils.py
def get_or_create_ttl_cache(
    cache_ref: list[TTLCache | None],
    lock: threading.Lock,
    ttl_seconds: float = 300.0,
    maxsize: int = 100,
) -> TTLCache | None:
    """Get or create a TTL cache with thread-safe initialization.

    Uses double-checked locking pattern for efficient thread-safe initialization.

    Args:
        cache_ref: List containing the cache reference (list allows mutation in closure).
        lock: Lock object for synchronization.
        ttl_seconds: Time-to-live for cached entries in seconds.
        maxsize: Maximum number of entries in the cache.

    Returns:
        The TTLCache instance.

    Example:
        ```python
        _cache: list[TTLCache | None] = [None]
        _cache_lock = threading.Lock()

        def get_cache() -> TTLCache:
            return get_or_create_ttl_cache(_cache, _cache_lock, ttl_seconds=300.0)
        ```
    """
    if cache_ref[0] is None:
        with lock:
            if cache_ref[0] is None:
                cache
CacheManager.get_cache method · python · L60-L81 (22 LOC)
api/cache_utils.py
    def get_cache(
        self,
        name: str,
        ttl_seconds: float = 300.0,
        maxsize: int = 100,
    ) -> TTLCache:
        """Get or create a named cache.

        Args:
            name: Unique name for the cache.
            ttl_seconds: Time-to-live for cached entries.
            maxsize: Maximum number of entries.

        Returns:
            The TTLCache instance.
        """
        if name not in self._caches:
            with self._global_lock:
                if name not in self._caches:
                    self._caches[name] = TTLCache(ttl_seconds=ttl_seconds, maxsize=maxsize)
                    self._locks[name] = threading.Lock()
        return self._caches[name]
CacheManager.clear_cache method · python · L83-L96 (14 LOC)
api/cache_utils.py
    def clear_cache(self, name: str) -> bool:
        """Clear a specific cache by name.

        Args:
            name: Name of the cache to clear.

        Returns:
            True if cache was found and cleared, False otherwise.
        """
        if name in self._caches:
            with self._locks[name]:
                self._caches[name].clear()
            return True
        return False
CacheManager.clear_all method · python · L98-L103 (6 LOC)
api/cache_utils.py
    def clear_all(self) -> None:
        """Clear all managed caches."""
        with self._global_lock:
            for name, cache in self._caches.items():
                with self._locks[name]:
                    cache.clear()
CacheManager.get_stats method · python · L105-L119 (15 LOC)
api/cache_utils.py
    def get_stats(self) -> dict[str, dict[str, Any]]:
        """Get statistics for all caches.

        Returns:
            Dictionary mapping cache names to their statistics.
        """
        stats = {}
        for name, cache in self._caches.items():
            cache_stats = cache.stats()
            stats[name] = {
                "size": cache_stats["size"],
                "maxsize": cache_stats["maxsize"],
                "ttl_seconds": cache_stats["ttl_seconds"],
            }
        return stats
get_cache_manager function · python · L127-L138 (12 LOC)
api/cache_utils.py
def get_cache_manager() -> CacheManager:
    """Get the global cache manager instance.

    Returns:
        The global CacheManager singleton.
    """
    global _global_cache_manager
    if _global_cache_manager is None:
        with _global_manager_lock:
            if _global_cache_manager is None:
                _global_cache_manager = CacheManager()
    return _global_cache_manager
get_imessage_reader function · python · L17-L49 (33 LOC)
api/dependencies.py
def get_imessage_reader() -> Iterator[ChatDBReader]:
    """Get a thread-safe iMessage reader for the current request.

    Uses a connection pool to acquire a fresh connection for each request.
    The connection is automatically released back to the pool when the
    request is finished.

    Yields:
        ChatDBReader instance

    Raises:
        HTTPException: 403 if Full Disk Access is not granted
    """
    reader = ChatDBReader()

    found, has_access = _access_cache.get("imessage_access")
    if not found:
        has_access = reader.check_access()
        if has_access:
            _access_cache.set("imessage_access", True)
        else:
            _access_cache.invalidate("imessage_access")

    if not has_access:
        reader.close()
        from jarvis.core.exceptions import imessage_permission_denied

        raise imessage_permission_denied()

    try:
        yield reader
    finally:
        reader.close()
Repobility · code-quality intelligence platform · https://repobility.com
get_status_code_for_error function · python · L94-L113 (20 LOC)
api/errors.py
def get_status_code_for_error(error: JarvisError) -> int:
    """Determine the appropriate HTTP status code for an error.

    Priority order:
    1. Error-code-level override in ERROR_CODE_STATUS_CODES
    2. The exception class's `status_code` attribute (defined on each JarvisError subclass)
    3. Default 500

    Args:
        error: The JARVIS error instance.

    Returns:
        HTTP status code (400-599).
    """
    # Check error-code-level overrides first (e.g. MDL_NOT_FOUND -> 404)
    if error.code in ERROR_CODE_STATUS_CODES:
        return ERROR_CODE_STATUS_CODES[error.code]

    # Use the status_code defined on the exception class
    return getattr(error, "status_code", 500)
build_error_response function · python · L116-L135 (20 LOC)
api/errors.py
def build_error_response(error: JarvisError) -> dict[str, Any]:
    """Build a standardized error response dictionary.

    Args:
        error: The JARVIS error instance.

    Returns:
        Dictionary with error, code, detail, and optional details fields.
    """
    response: dict[str, Any] = {
        "error": error.__class__.__name__,
        "code": error.code.value,
        "detail": error.message,
    }

    # Include additional details if present
    if error.details:
        response["details"] = error.details

    return response
jarvis_error_handler function · python · L138-L179 (42 LOC)
api/errors.py
async def jarvis_error_handler(request: Request, exc: JarvisError) -> JSONResponse:
    """Handle JarvisError and subclasses.

    Converts JARVIS errors to appropriate HTTP responses with
    standardized error format.

    Args:
        request: The FastAPI request object.
        exc: The JARVIS error that was raised.

    Returns:
        JSONResponse with appropriate status code and error body.
    """
    status_code = get_status_code_for_error(exc)
    response_body = build_error_response(exc)

    # Log the error with appropriate level
    if status_code >= 500:
        logger.error(
            "Server error: %s (code=%s, status=%d)",
            exc.message,
            exc.code.value,
            status_code,
            exc_info=exc.cause if exc.cause else exc,
        )
    else:
        logger.warning(
            "Client error: %s (code=%s, status=%d)",
            exc.message,
            exc.code.value,
            status_code,
        )

    headers = {}
    if status
validation_error_handler function · python · L182-L206 (25 LOC)
api/errors.py
async def validation_error_handler(request: Request, exc: ValidationError) -> JSONResponse:
    """Handle ValidationError with additional field information.

    Provides more detailed error responses for validation failures.

    Args:
        request: The FastAPI request object.
        exc: The validation error that was raised.

    Returns:
        JSONResponse with 400 status and validation details.
    """
    response_body = build_error_response(exc)

    logger.debug(
        "Validation error on %s %s: %s",
        request.method,
        request.url.path,
        exc.message,
    )

    return JSONResponse(
        status_code=400,
        content=response_body,
    )
imessage_access_error_handler function · python · L209-L233 (25 LOC)
api/errors.py
async def imessage_access_error_handler(request: Request, exc: iMessageAccessError) -> JSONResponse:
    """Handle iMessageAccessError with permission instructions.

    Provides helpful instructions for resolving permission issues.

    Args:
        request: The FastAPI request object.
        exc: The iMessage access error that was raised.

    Returns:
        JSONResponse with 403 status and permission instructions.
    """
    response_body = build_error_response(exc)

    logger.warning(
        "iMessage access denied for %s %s: %s",
        request.method,
        request.url.path,
        exc.message,
    )

    return JSONResponse(
        status_code=403,
        content=response_body,
    )
model_load_error_handler function · python · L236-L260 (25 LOC)
api/errors.py
async def model_load_error_handler(request: Request, exc: ModelLoadError) -> JSONResponse:
    """Handle ModelLoadError with service availability information.

    Args:
        request: The FastAPI request object.
        exc: The model load error that was raised.

    Returns:
        JSONResponse with 503 status.
    """
    response_body = build_error_response(exc)

    logger.error(
        "Model load failed for %s %s: %s",
        request.method,
        request.url.path,
        exc.message,
        exc_info=exc.cause if exc.cause else exc,
    )

    return JSONResponse(
        status_code=503,
        content=response_body,
        headers={"Retry-After": "30"},  # Suggest retry after 30 seconds
    )
resource_error_handler function · python · L263-L286 (24 LOC)
api/errors.py
async def resource_error_handler(request: Request, exc: ResourceError) -> JSONResponse:
    """Handle ResourceError with service availability information.

    Args:
        request: The FastAPI request object.
        exc: The resource error that was raised.

    Returns:
        JSONResponse with 503 status.
    """
    response_body = build_error_response(exc)

    logger.error(
        "Resource error for %s %s: %s",
        request.method,
        request.url.path,
        exc.message,
    )

    return JSONResponse(
        status_code=503,
        content=response_body,
        headers={"Retry-After": "60"},  # Suggest retry after 60 seconds
    )
timeout_error_handler function · python · L289-L318 (30 LOC)
api/errors.py
async def timeout_error_handler(request: Request, exc: TimeoutError) -> JSONResponse:
    """Handle timeout errors with 408 Request Timeout.

    This handler catches asyncio.TimeoutError and similar timeout exceptions
    and returns an appropriate 408 response.

    Args:
        request: The FastAPI request object.
        exc: The timeout error that was raised.

    Returns:
        JSONResponse with 408 status and retry information.
    """
    logger.warning(
        "Request timeout for %s %s",
        request.method,
        request.url.path,
    )

    response_body = {
        "error": "RequestTimeout",
        "code": "REQUEST_TIMEOUT",
        "detail": "The request timed out. Please try again.",
    }

    return JSONResponse(
        status_code=408,
        content=response_body,
        headers={"Retry-After": "5"},  # Suggest retry after 5 seconds
    )
Open data scored by Repobility · https://repobility.com
generic_exception_handler function · python · L321-L350 (30 LOC)
api/errors.py
async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse:
    """Handle unexpected exceptions with a generic error response.

    This is a catch-all handler for exceptions that aren't JarvisError
    subclasses. It logs the full exception and returns a safe error message.

    Args:
        request: The FastAPI request object.
        exc: The unexpected exception.

    Returns:
        JSONResponse with 500 status and generic error message.
    """
    logger.exception(
        "Unexpected error handling %s %s: %s",
        request.method,
        request.url.path,
        str(exc),
    )

    response_body = {
        "error": "InternalError",
        "code": "INTERNAL_ERROR",
        "detail": "An unexpected error occurred. Please try again later.",
    }

    return JSONResponse(
        status_code=500,
        content=response_body,
    )
register_exception_handlers function · python · L353-L382 (30 LOC)
api/errors.py
def register_exception_handlers(app: FastAPI) -> None:
    """Register all JARVIS exception handlers with a FastAPI app.

    This function should be called once when setting up the FastAPI app
    to ensure all JARVIS errors are handled consistently.

    Args:
        app: The FastAPI application instance.

    Example:
        app = FastAPI()
        register_exception_handlers(app)
    """
    # Register specific handlers first (most specific to least specific)
    # Note: type: ignore is needed due to Starlette's overly strict typing
    app.add_exception_handler(ValidationError, validation_error_handler)  # type: ignore[arg-type]
    app.add_exception_handler(iMessageAccessError, imessage_access_error_handler)  # type: ignore[arg-type]
    app.add_exception_handler(ModelLoadError, model_load_error_handler)  # type: ignore[arg-type]
    app.add_exception_handler(ResourceError, resource_error_handler)  # type: ignore[arg-type]

    # Register the base JarvisError handler (catches a
lifespan function · python · L90-L126 (37 LOC)
api/main.py
async def lifespan(app_instance: FastAPI) -> AsyncIterator[None]:
    """Lifecycle event handler for the FastAPI application."""
    # Start model warmer
    import asyncio

    from jarvis.interfaces.desktop.server import JarvisSocketServer
    from jarvis.model_warmer import get_model_warmer
    from jarvis.tasks.worker import start_worker, stop_worker

    # Initialize socket server (direct desktop communication)
    # We run it in the same process to share the GPU memory pool
    socket_server = JarvisSocketServer(
        enable_watcher=True,
        preload_models=False,  # API already warms models
        enable_prefetch=True,
    )

    # Start socket server in background
    socket_task = asyncio.create_task(socket_server.serve_forever())
    app_instance.state.socket_server = socket_server

    get_model_warmer().start()
    start_worker()

    yield

    # Stop services
    stop_worker()
    get_model_warmer().stop()

    # Stop socket server
    await socket_server.stop()
 
_create_openapi_generator function · python · L262-L303 (42 LOC)
api/main.py
def _create_openapi_generator(app_instance: FastAPI) -> callable:  # type: ignore[valid-type]
    """Create a custom OpenAPI schema generator for the given app instance."""

    def custom_openapi() -> dict[str, Any]:
        """Generate custom OpenAPI schema with enhanced documentation."""
        if app_instance.openapi_schema:
            return app_instance.openapi_schema

        openapi_schema = get_openapi(
            title=API_TITLE,
            version=API_VERSION,
            description=API_DESCRIPTION,
            routes=app_instance.routes,
            tags=API_TAGS_METADATA,
        )

        # Add contact and license info
        openapi_schema["info"]["contact"] = API_CONTACT
        openapi_schema["info"]["license"] = API_LICENSE

        # Add server information
        openapi_schema["servers"] = [
            {
                "url": "http://localhost:8742",
                "description": "Local development server",
            },
            {
                "ur
_configure_middleware function · python · L306-L354 (49 LOC)
api/main.py
def _configure_middleware(app_instance: FastAPI) -> None:
    """Configure middleware for the FastAPI application."""
    # Configure CORS for Tauri and development
    app_instance.add_middleware(
        CORSMiddleware,
        allow_origins=[
            "tauri://localhost",  # Tauri production
            "http://localhost:5173",  # Vite dev server
            "http://localhost:1420",  # Tauri dev server
            "http://127.0.0.1:5173",
            "http://127.0.0.1:1420",
        ],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )

    # Enable GZip compression for responses (50-75% bandwidth savings for large JSON responses)
    app_instance.add_middleware(GZipMiddleware, minimum_size=500)

    # Rate limiting middleware (applies default_limits to all routes)
    app_instance.add_middleware(SlowAPIMiddleware)

    # Request timing middleware
    @app_instance.middleware("http")
    async def metrics_middleware(request: Request, c
_register_routers function · python · L357-L418 (62 LOC)
api/main.py
def _register_routers(app_instance: FastAPI) -> None:
    """Register all API routers explicitly.

    Imports routers on-demand to avoid eager loading at module import time.
    This improves testability and reduces import-time coupling.
    """
    # Import routers locally to avoid module-level coupling
    from api.routers.analytics import router as analytics_router
    from api.routers.attachments import router as attachments_router
    from api.routers.batch import router as batch_router
    from api.routers.calendar import router as calendar_router
    from api.routers.contacts import router as contacts_router
    from api.routers.conversations import router as conversations_router
    from api.routers.custom_templates import router as custom_templates_router
    from api.routers.drafts import router as drafts_router

    # from api.routers.embeddings import router as embeddings_router  # Missing
    from api.routers.experiments import router as experiments_router
    from api.ro
create_app function · python · L421-L462 (42 LOC)
api/main.py
def create_app() -> FastAPI:
    """Application factory for creating configured FastAPI instances.

    This factory pattern allows:
    - Explicit router registration without import-time coupling
    - Easy creation of test apps with subset of routers
    - Better testability and modularity

    Returns:
        Configured FastAPI application instance
    """
    # Create FastAPI app
    app_instance = FastAPI(
        title=API_TITLE,
        description=API_DESCRIPTION,
        version=API_VERSION,
        docs_url="/docs",
        redoc_url="/redoc",
        openapi_url="/openapi.json",
        openapi_tags=API_TAGS_METADATA,
        contact=API_CONTACT,
        license_info=API_LICENSE,
        lifespan=lifespan,
    )

    # Configure rate limiting
    app_instance.state.limiter = limiter
    app_instance.add_exception_handler(RateLimitExceeded, rate_limit_exceeded_handler)  # type: ignore[arg-type]

    # Use custom OpenAPI schema
    app_instance.openapi = _create_openapi_gener
_get_cached_config_value function · python · L44-L67 (24 LOC)
api/ratelimit.py
def _get_cached_config_value(key: str, getter: Callable[[], Any], default: Any) -> Any:
    """Get config value with caching.

    Args:
        key: Cache key
        getter: Function to get the value if cache miss
        default: Default value if getter fails

    Returns:
        Config value
    """
    now = time.time()
    if key in _config_cache:
        value, timestamp = _config_cache[key]
        if now - timestamp < _CONFIG_CACHE_TTL:
            return value

    try:
        value = getter()
        _config_cache[key] = (value, now)
        return value
    except Exception:
        logger.debug("Config lookup failed for %s, using default", key)
        return default
Repobility — same analyzer, your code, free for public repos · /scan/
get_remote_address function · python · L70-L91 (22 LOC)
api/ratelimit.py
def get_remote_address(request: Request) -> str:
    """Get client identifier for rate limiting.

    For local-first apps, we use a combination of IP and user-agent
    to differentiate between different local clients.

    Args:
        request: The FastAPI request object.

    Returns:
        String identifier for the client.
    """
    # Use slowapi's default for IP
    ip = _get_remote_address(request) or "unknown"

    # For localhost, add user-agent to differentiate clients
    if ip in ("127.0.0.1", "localhost", "::1"):
        user_agent = request.headers.get("user-agent", "unknown")
        # Hash the user agent for privacy
        return f"{ip}:{hash(user_agent) % 10000}"

    return ip
rate_limit_exceeded_handler function · python · L109-L156 (48 LOC)
api/ratelimit.py
def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded) -> Response:
    """Handle rate limit exceeded errors with proper 429 response.

    Args:
        request: The FastAPI request object.
        exc: The rate limit exception.

    Returns:
        JSON response with 429 status and retry-after header.
    """
    from fastapi.responses import JSONResponse

    # Extract retry-after from the exception if available
    retry_after = 60  # Default to 60 seconds
    if hasattr(exc, "detail") and exc.detail:
        # Try to parse the limit from the detail message
        try:
            # Detail format: "Rate limit exceeded: X per Y minute"
            parts = str(exc.detail).split()
            for i, part in enumerate(parts):
                if part.isdigit() and i + 1 < len(parts):
                    if "minute" in parts[i + 1]:
                        retry_after = 60
                    elif "second" in parts[i + 1]:
                        retry_after = int(pa
with_timeout function · python · L159-L192 (34 LOC)
api/ratelimit.py
def with_timeout(timeout_seconds: float) -> Callable[[F], F]:
    """Decorator to add timeout to async endpoints.

    Args:
        timeout_seconds: Maximum time in seconds before timeout.

    Returns:
        Decorator function.

    Usage:
        @with_timeout(30.0)
        async def my_endpoint():
            ...
    """

    def decorator(func: F) -> F:
        @wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> Any:
            try:
                return await asyncio.wait_for(
                    func(*args, **kwargs),
                    timeout=timeout_seconds,
                )
            except TimeoutError:
                from fastapi import HTTPException

                raise HTTPException(
                    status_code=408,
                    detail=f"Request timed out after {timeout_seconds} seconds",
                ) from None

        return wrapper  # type: ignore[return-value]

    return decorator
run_in_threadpool function · python · L195-L221 (27 LOC)
api/ratelimit.py
def run_in_threadpool(func: Callable[..., Any]) -> Callable[..., Awaitable[Any]]:
    """Decorator to run a synchronous function in a thread pool.

    Use this for CPU-bound operations like model generation to avoid
    blocking the event loop.

    Args:
        func: Synchronous function to wrap.

    Returns:
        Async function that runs the original in a thread pool.

    Usage:
        @run_in_threadpool
        def cpu_intensive_work():
            ...

        async def endpoint():
            result = await cpu_intensive_work()
    """
    from starlette.concurrency import run_in_threadpool as starlette_threadpool

    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> Any:
        return await starlette_threadpool(func, *args, **kwargs)

    return wrapper
get_timeout_generation function · python · L224-L238 (15 LOC)
api/ratelimit.py
def get_timeout_generation() -> float:
    """Get generation timeout from config.

    Uses caching to avoid reading config file on every call.

    Returns:
        Timeout in seconds for generation operations.
    """
    return float(
        _get_cached_config_value(
            "timeout_generation",
            lambda: get_config().rate_limit.generation_timeout_seconds,
            30.0,  # Default fallback
        )
    )
get_timeout_read function · python · L241-L255 (15 LOC)
api/ratelimit.py
def get_timeout_read() -> float:
    """Get read timeout from config.

    Uses caching to avoid reading config file on every call.

    Returns:
        Timeout in seconds for read operations.
    """
    return float(
        _get_cached_config_value(
            "timeout_read",
            lambda: get_config().rate_limit.read_timeout_seconds,
            10.0,  # Default fallback
        )
    )
get_analytics_cache function · python · L91-L101 (11 LOC)
api/routers/analytics.py
def get_analytics_cache() -> TTLCache:
    """Get analytics cache with 5-minute TTL.

    Uses double-checked locking pattern for thread safety.
    """
    global _analytics_cache
    if _analytics_cache is None:
        with _analytics_cache_lock:
            if _analytics_cache is None:
                _analytics_cache = TTLCache(ttl_seconds=300.0, maxsize=100)
    return _analytics_cache
_get_time_range_start function · python · L104-L113 (10 LOC)
api/routers/analytics.py
def _get_time_range_start(time_range: TimeRangeEnum) -> datetime | None:
    """Get start datetime for time range."""
    now = datetime.now(UTC)
    if time_range == TimeRangeEnum.WEEK:
        return now - timedelta(days=7)
    elif time_range == TimeRangeEnum.MONTH:
        return now - timedelta(days=30)
    elif time_range == TimeRangeEnum.THREE_MONTHS:
        return now - timedelta(days=90)
    return None  # all_time
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_analytics_overview function · python · L122-L173 (52 LOC)
api/routers/analytics.py
async def get_analytics_overview(
    request: Request,
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for analytics",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get comprehensive overview metrics for the analytics dashboard.

    Returns aggregated statistics including:
    - Total messages (sent/received)
    - Active conversations count
    - Average messages per day
    - Average response time
    - Overall sentiment
    - Peak activity times
    - Period comparison (vs previous period)

    **Example Response:**
    ```json
    {
        "total_messages": 5000,
        "sent_messages": 2500,
        "received_messages": 2500,
        "active_conversations": 45,
        "avg_messages_per_day": 50.5,
        "avg_response_time_minutes": 12.5,
        "sentiment": {"score": 0.35, "label": "positive"},
        "peak_hour": 14,
        "peak_day": "Wednesday",
        "period_c
get_analytics_timeline function · python · L182-L241 (60 LOC)
api/routers/analytics.py
async def get_analytics_timeline(
    request: Request,
    granularity: str = Query(
        default="day",
        pattern="^(hour|day|week|month)$",
        description="Time granularity (hour, day, week, month)",
    ),
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for data",
    ),
    metric: str = Query(
        default="messages",
        pattern="^(messages|sentiment|response_time)$",
        description="Metric to track",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get time-series data for chart visualization.

    Supports multiple granularities and metrics for flexible charting.

    **Granularities:**
    - `hour`: Hourly breakdown (24 data points)
    - `day`: Daily counts
    - `week`: Weekly aggregates
    - `month`: Monthly aggregates

    **Metrics:**
    - `messages`: Message counts (total, sent, received)
    - `sentiment`: Average sentiment scores
    - `re
get_activity_heatmap function · python · L250-L297 (48 LOC)
api/routers/analytics.py
async def get_activity_heatmap(
    request: Request,
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.THREE_MONTHS,
        description="Time range for heatmap",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get GitHub-style activity heatmap data.

    Returns daily activity levels for calendar visualization.

    **Activity Levels:**
    - 0: No activity
    - 1: Low (1-5 messages)
    - 2: Medium (6-15 messages)
    - 3: High (16-30 messages)
    - 4: Very high (31+ messages)

    **Example Response:**
    ```json
    {
        "data": [
            {"date": "2024-01-15", "count": 45, "level": 4},
            {"date": "2024-01-16", "count": 12, "level": 2}
        ],
        "stats": {
            "total_days": 90,
            "active_days": 75,
            "max_count": 85,
            "avg_count": 25.5
        }
    }
    ```
    """
    cache_key = f"heatmap:{time_range.value}"
    cache = get_analytics_cache()

   
get_contact_stats function · python · L306-L362 (57 LOC)
api/routers/analytics.py
async def get_contact_stats(
    request: Request,
    chat_id: str,
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for statistics",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get detailed analytics for a specific contact.

    Returns comprehensive statistics including:
    - Message counts and balance
    - Response time metrics
    - Sentiment analysis
    - Activity patterns
    - Engagement score

    **Example Response:**
    ```json
    {
        "contact_id": "chat123456",
        "contact_name": "John Doe",
        "total_messages": 500,
        "sent_count": 245,
        "received_count": 255,
        "avg_response_time_minutes": 8.5,
        "sentiment_score": 0.42,
        "engagement_score": 78.5,
        "message_trend": "increasing",
        "hourly_distribution": {...},
        "daily_distribution": {...}
    }
    ```
    """
    cache_key = f"contact:{chat_id}
get_contacts_leaderboard function · python · L371-L428 (58 LOC)
api/routers/analytics.py
async def get_contacts_leaderboard(
    request: Request,
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for ranking",
    ),
    limit: int = Query(
        default=10,
        ge=1,
        le=50,
        description="Number of contacts to return",
    ),
    sort_by: str = Query(
        default="messages",
        pattern="^(messages|engagement|response_time)$",
        description="Sort criteria",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get ranked list of top contacts by various criteria.

    **Sort Options:**
    - `messages`: By total message count
    - `engagement`: By engagement score
    - `response_time`: By average response time (fastest first)

    **Example Response:**
    ```json
    {
        "contacts": [
            {
                "rank": 1,
                "contact_id": "chat123",
                "contact_name": "John Doe",
                "total_mess
get_trending_patterns function · python · L437-L492 (56 LOC)
api/routers/analytics.py
async def get_trending_patterns(
    request: Request,
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for trend analysis",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
    """Get detected trends, anomalies, and patterns.

    Returns:
    - Overall activity trend
    - Trending contacts (significant changes)
    - Detected anomalies (unusual activity)
    - Seasonality patterns

    **Example Response:**
    ```json
    {
        "overall_trend": {
            "direction": "increasing",
            "percentage_change": 25.5,
            "confidence": 0.85
        },
        "trending_contacts": [
            {
                "contact_id": "chat123",
                "contact_name": "John",
                "trend": "increasing",
                "change_percent": 45.5
            }
        ],
        "anomalies": [
            {
                "date": "2024-01-20",
                "type"
export_analytics function · python · L501-L532 (32 LOC)
api/routers/analytics.py
async def export_analytics(
    request: Request,
    format: str = Query(
        default="json",
        pattern="^(json|csv)$",
        description="Export format",
    ),
    time_range: TimeRangeEnum = Query(
        default=TimeRangeEnum.MONTH,
        description="Time range for export",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> Response:
    """Export analytics data in CSV or JSON format.

    **Formats:**
    - `json`: Complete analytics as JSON
    - `csv`: Daily/weekly/monthly aggregates as CSV files (zip archive)

    Returns the file content with appropriate headers for download.
    """
    time_range_start = _get_time_range_start(time_range)

    content, media_type, filename = await run_in_threadpool(
        fetch_export_data, reader, format, time_range, time_range_start
    )

    return Response(
        content=content,
        media_type=media_type,
        headers={"Content-Disposition": f"attachment; filename={filename}"},
    )
clear_analytics_cache function · python · L541-L550 (10 LOC)
api/routers/analytics.py
async def clear_analytics_cache(request: Request) -> dict[str, str]:
    """Clear the analytics cache to force fresh computation.

    Use this after significant data changes or when troubleshooting
    stale data issues.
    """
    global _analytics_cache
    with _analytics_cache_lock:
        _analytics_cache = None
    return {"status": "ok", "message": "Analytics cache cleared"}
Repobility · code-quality intelligence platform · https://repobility.com
get_category_averages function · python · L581-L588 (8 LOC)
api/routers/analytics.py
def get_category_averages() -> list[CategoryAverageItem]:
    """Get average similarity scores per template category."""
    analytics = get_template_analytics()
    averages = analytics.get_category_averages()
    return [
        CategoryAverageItem(category=cat, average_similarity=round(avg, 4))
        for cat, avg in sorted(averages.items(), key=lambda x: x[1], reverse=True)
    ]
list_available_templates function · python · L592-L602 (11 LOC)
api/routers/analytics.py
def list_available_templates() -> list[TemplateInfo]:
    """List all available templates."""
    templates = _load_templates()
    return [
        TemplateInfo(
            name=t.name,
            pattern_count=len(t.patterns),
            sample_patterns=t.patterns[:3],
        )
        for t in templates
    ]
get_template_coverage function · python · L606-L624 (19 LOC)
api/routers/analytics.py
def get_template_coverage() -> dict[str, Any]:
    """Get overall template coverage statistics."""
    analytics = get_template_analytics()
    stats = analytics.get_stats()
    templates = _load_templates()
    total_patterns = sum(len(t.patterns) for t in templates)

    return {
        "total_templates": len(templates),
        "total_patterns": total_patterns,
        "responses_from_templates": stats["template_hits"],
        "responses_from_model": stats["model_fallbacks"],
        "coverage_percent": stats["hit_rate_percent"],
        "template_efficiency": (
            round(stats["template_hits"] / len(templates), 2)
            if len(templates) > 0 and stats["template_hits"] > 0
            else 0.0
        ),
    }
export_raw_template_analytics function · python · L628-L635 (8 LOC)
api/routers/analytics.py
def export_raw_template_analytics() -> JSONResponse:
    """Export raw template analytics data as JSON."""
    analytics = get_template_analytics()
    raw_data = analytics.export_raw()
    return JSONResponse(
        content=raw_data,
        headers={"Content-Disposition": "attachment; filename=template_analytics.json"},
    )
get_template_dashboard_data function · python · L647-L672 (26 LOC)
api/routers/analytics.py
def get_template_dashboard_data() -> dict[str, Any]:
    """Get all data needed for the template analytics dashboard."""
    analytics = get_template_analytics()
    stats = analytics.get_stats()
    templates = _load_templates()

    return {
        "summary": stats,
        "top_templates": analytics.get_top_templates(limit=20),
        "missed_queries": analytics.get_missed_queries(limit=20),
        "category_averages": [
            {"category": cat, "average_similarity": round(avg, 4)}
            for cat, avg in analytics.get_category_averages().items()
        ],
        "coverage": {
            "total_templates": len(templates),
            "total_patterns": sum(len(t.patterns) for t in templates),
            "responses_from_templates": stats["template_hits"],
            "responses_from_model": stats["model_fallbacks"],
            "coverage_percent": stats["hit_rate_percent"],
        },
        "pie_chart_data": {
            "template_responses": stats["template_hits"],
format_bytes function · python · L52-L68 (17 LOC)
api/routers/attachments.py
def format_bytes(size_bytes: int) -> str:
    """Format bytes as human-readable string.

    Args:
        size_bytes: Size in bytes

    Returns:
        Formatted string like "1.5 MB" or "2.3 GB"
    """
    if size_bytes < 1024:
        return f"{size_bytes} B"
    elif size_bytes < 1024 * 1024:
        return f"{size_bytes / 1024:.1f} KB"
    elif size_bytes < 1024 * 1024 * 1024:
        return f"{size_bytes / (1024 * 1024):.1f} MB"
    else:
        return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
_generate_thumbnail_with_sips function · python · L71-L110 (40 LOC)
api/routers/attachments.py
def _generate_thumbnail_with_sips(source_path: Path, max_pixels: int = 1024) -> Path | None:
    """Generate a JPEG thumbnail using macOS sips and cache it in temp dir."""
    try:
        stat = source_path.stat()
        cache_key = hashlib.sha256(
            f"{source_path}:{stat.st_mtime_ns}:{stat.st_size}".encode()
        ).hexdigest()[:20]
        cache_dir = Path(tempfile.gettempdir()) / "jarvis-attachment-thumbnails"
        cache_dir.mkdir(parents=True, exist_ok=True)
        thumb_path = cache_dir / f"{cache_key}.jpg"

        if thumb_path.exists() and thumb_path.stat().st_size > 0:
            return thumb_path

        result = subprocess.run(
            [
                "sips",
                "-s",
                "format",
                "jpeg",
                "-Z",
                str(max_pixels),
                str(source_path),
                "--out",
                str(thumb_path),
            ],
            capture_output=True,
            text=True,
     
list_attachments function · python · L149-L235 (87 LOC)
api/routers/attachments.py
async def list_attachments(
    chat_id: str | None = Query(
        default=None,
        description="Filter by conversation ID",
        examples=["chat123456789"],
    ),
    attachment_type: AttachmentTypeEnum = Query(
        default=AttachmentTypeEnum.ALL,
        description="Filter by attachment type",
    ),
    after: datetime | None = Query(
        default=None,
        description="Only attachments after this date",
        examples=["2024-01-01T00:00:00Z"],
    ),
    before: datetime | None = Query(
        default=None,
        description="Only attachments before this date",
        examples=["2024-12-31T23:59:59Z"],
    ),
    limit: int = Query(
        default=100,
        ge=1,
        le=500,
        description="Maximum number of attachments to return",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> list[AttachmentWithContextResponse]:
    """List attachments with optional filtering.

    Returns attachments sorted by date (newest first), w
Open data scored by Repobility · https://repobility.com
get_attachment_stats function · python · L266-L289 (24 LOC)
api/routers/attachments.py
def get_attachment_stats(
    chat_id: str,
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> AttachmentStatsResponse:
    """Get attachment statistics for a specific conversation.

    Returns the total count and size of attachments, broken down by type
    (images, videos, audio, documents, other).

    **Response includes:**
    - Total attachment count
    - Total size in bytes and human-readable format
    - Breakdown by type (count and size)
    """
    stats = reader.get_attachment_stats(chat_id)

    return AttachmentStatsResponse(
        chat_id=chat_id,
        total_count=stats["total_count"],
        total_size_bytes=stats["total_size_bytes"],
        total_size_formatted=format_bytes(stats["total_size_bytes"]),
        by_type=stats["by_type"],
        size_by_type=stats["size_by_type"],
    )
get_storage_summary function · python · L326-L370 (45 LOC)
api/routers/attachments.py
def get_storage_summary(
    limit: int = Query(
        default=50,
        ge=1,
        le=200,
        description="Maximum number of conversations to return",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> StorageSummaryResponse:
    """Get storage usage breakdown by conversation.

    Returns a summary of attachment storage across all conversations,
    sorted by total size (largest first).

    **Use cases:**
    - Identify conversations taking up the most space
    - Get total iMessage attachment storage usage
    - Plan storage cleanup
    """
    storage_data = reader.get_storage_by_conversation(limit=limit)

    total_attachments = 0
    total_size = 0
    by_conversation = []

    for item in storage_data:
        total_attachments += item["attachment_count"]
        total_size += item["total_size_bytes"]

        by_conversation.append(
            StorageByConversationResponse(
                chat_id=item["chat_id"],
                display_name=ite
get_thumbnail function · python · L393-L466 (74 LOC)
api/routers/attachments.py
def get_thumbnail(
    file_path: str = Query(
        ...,
        description="Path to the original attachment file",
    ),
    reader: ChatDBReader = Depends(get_imessage_reader),
) -> FileResponse:
    """Get a thumbnail for an attachment.

    Attempts to find and return a thumbnail for the specified attachment.
    If no thumbnail exists, returns a 404 error.

    **Note:** Only image and video attachments typically have thumbnails.
    """
    # Expand tilde in path
    if file_path.startswith("~"):
        file_path = str(Path(file_path).expanduser())

    # Security check: ensure the path is within the expected attachments directory
    attachments_base = Path.home() / "Library" / "Messages" / "Attachments"
    try:
        resolved_path = Path(file_path).resolve()
        resolved_base = attachments_base.resolve()
        # Use relative_to to ensure path is within base directory
        resolved_path.relative_to(resolved_base)
    except ValueError as e:
        raise HTTPEx
page 1 / 20next ›