Function bodies 1,000 total
get_or_create_ttl_cache function · python · L14-L46 (33 LOC)api/cache_utils.py
def get_or_create_ttl_cache(
cache_ref: list[TTLCache | None],
lock: threading.Lock,
ttl_seconds: float = 300.0,
maxsize: int = 100,
) -> TTLCache | None:
"""Get or create a TTL cache with thread-safe initialization.
Uses double-checked locking pattern for efficient thread-safe initialization.
Args:
cache_ref: List containing the cache reference (list allows mutation in closure).
lock: Lock object for synchronization.
ttl_seconds: Time-to-live for cached entries in seconds.
maxsize: Maximum number of entries in the cache.
Returns:
The TTLCache instance.
Example:
```python
_cache: list[TTLCache | None] = [None]
_cache_lock = threading.Lock()
def get_cache() -> TTLCache:
return get_or_create_ttl_cache(_cache, _cache_lock, ttl_seconds=300.0)
```
"""
if cache_ref[0] is None:
with lock:
if cache_ref[0] is None:
cacheCacheManager.get_cache method · python · L60-L81 (22 LOC)api/cache_utils.py
def get_cache(
self,
name: str,
ttl_seconds: float = 300.0,
maxsize: int = 100,
) -> TTLCache:
"""Get or create a named cache.
Args:
name: Unique name for the cache.
ttl_seconds: Time-to-live for cached entries.
maxsize: Maximum number of entries.
Returns:
The TTLCache instance.
"""
if name not in self._caches:
with self._global_lock:
if name not in self._caches:
self._caches[name] = TTLCache(ttl_seconds=ttl_seconds, maxsize=maxsize)
self._locks[name] = threading.Lock()
return self._caches[name]CacheManager.clear_cache method · python · L83-L96 (14 LOC)api/cache_utils.py
def clear_cache(self, name: str) -> bool:
"""Clear a specific cache by name.
Args:
name: Name of the cache to clear.
Returns:
True if cache was found and cleared, False otherwise.
"""
if name in self._caches:
with self._locks[name]:
self._caches[name].clear()
return True
return FalseCacheManager.clear_all method · python · L98-L103 (6 LOC)api/cache_utils.py
def clear_all(self) -> None:
"""Clear all managed caches."""
with self._global_lock:
for name, cache in self._caches.items():
with self._locks[name]:
cache.clear()CacheManager.get_stats method · python · L105-L119 (15 LOC)api/cache_utils.py
def get_stats(self) -> dict[str, dict[str, Any]]:
"""Get statistics for all caches.
Returns:
Dictionary mapping cache names to their statistics.
"""
stats = {}
for name, cache in self._caches.items():
cache_stats = cache.stats()
stats[name] = {
"size": cache_stats["size"],
"maxsize": cache_stats["maxsize"],
"ttl_seconds": cache_stats["ttl_seconds"],
}
return statsget_cache_manager function · python · L127-L138 (12 LOC)api/cache_utils.py
def get_cache_manager() -> CacheManager:
"""Get the global cache manager instance.
Returns:
The global CacheManager singleton.
"""
global _global_cache_manager
if _global_cache_manager is None:
with _global_manager_lock:
if _global_cache_manager is None:
_global_cache_manager = CacheManager()
return _global_cache_managerget_imessage_reader function · python · L17-L49 (33 LOC)api/dependencies.py
def get_imessage_reader() -> Iterator[ChatDBReader]:
"""Get a thread-safe iMessage reader for the current request.
Uses a connection pool to acquire a fresh connection for each request.
The connection is automatically released back to the pool when the
request is finished.
Yields:
ChatDBReader instance
Raises:
HTTPException: 403 if Full Disk Access is not granted
"""
reader = ChatDBReader()
found, has_access = _access_cache.get("imessage_access")
if not found:
has_access = reader.check_access()
if has_access:
_access_cache.set("imessage_access", True)
else:
_access_cache.invalidate("imessage_access")
if not has_access:
reader.close()
from jarvis.core.exceptions import imessage_permission_denied
raise imessage_permission_denied()
try:
yield reader
finally:
reader.close()Repobility · code-quality intelligence platform · https://repobility.com
get_status_code_for_error function · python · L94-L113 (20 LOC)api/errors.py
def get_status_code_for_error(error: JarvisError) -> int:
"""Determine the appropriate HTTP status code for an error.
Priority order:
1. Error-code-level override in ERROR_CODE_STATUS_CODES
2. The exception class's `status_code` attribute (defined on each JarvisError subclass)
3. Default 500
Args:
error: The JARVIS error instance.
Returns:
HTTP status code (400-599).
"""
# Check error-code-level overrides first (e.g. MDL_NOT_FOUND -> 404)
if error.code in ERROR_CODE_STATUS_CODES:
return ERROR_CODE_STATUS_CODES[error.code]
# Use the status_code defined on the exception class
return getattr(error, "status_code", 500)build_error_response function · python · L116-L135 (20 LOC)api/errors.py
def build_error_response(error: JarvisError) -> dict[str, Any]:
"""Build a standardized error response dictionary.
Args:
error: The JARVIS error instance.
Returns:
Dictionary with error, code, detail, and optional details fields.
"""
response: dict[str, Any] = {
"error": error.__class__.__name__,
"code": error.code.value,
"detail": error.message,
}
# Include additional details if present
if error.details:
response["details"] = error.details
return responsejarvis_error_handler function · python · L138-L179 (42 LOC)api/errors.py
async def jarvis_error_handler(request: Request, exc: JarvisError) -> JSONResponse:
"""Handle JarvisError and subclasses.
Converts JARVIS errors to appropriate HTTP responses with
standardized error format.
Args:
request: The FastAPI request object.
exc: The JARVIS error that was raised.
Returns:
JSONResponse with appropriate status code and error body.
"""
status_code = get_status_code_for_error(exc)
response_body = build_error_response(exc)
# Log the error with appropriate level
if status_code >= 500:
logger.error(
"Server error: %s (code=%s, status=%d)",
exc.message,
exc.code.value,
status_code,
exc_info=exc.cause if exc.cause else exc,
)
else:
logger.warning(
"Client error: %s (code=%s, status=%d)",
exc.message,
exc.code.value,
status_code,
)
headers = {}
if statusvalidation_error_handler function · python · L182-L206 (25 LOC)api/errors.py
async def validation_error_handler(request: Request, exc: ValidationError) -> JSONResponse:
"""Handle ValidationError with additional field information.
Provides more detailed error responses for validation failures.
Args:
request: The FastAPI request object.
exc: The validation error that was raised.
Returns:
JSONResponse with 400 status and validation details.
"""
response_body = build_error_response(exc)
logger.debug(
"Validation error on %s %s: %s",
request.method,
request.url.path,
exc.message,
)
return JSONResponse(
status_code=400,
content=response_body,
)imessage_access_error_handler function · python · L209-L233 (25 LOC)api/errors.py
async def imessage_access_error_handler(request: Request, exc: iMessageAccessError) -> JSONResponse:
"""Handle iMessageAccessError with permission instructions.
Provides helpful instructions for resolving permission issues.
Args:
request: The FastAPI request object.
exc: The iMessage access error that was raised.
Returns:
JSONResponse with 403 status and permission instructions.
"""
response_body = build_error_response(exc)
logger.warning(
"iMessage access denied for %s %s: %s",
request.method,
request.url.path,
exc.message,
)
return JSONResponse(
status_code=403,
content=response_body,
)model_load_error_handler function · python · L236-L260 (25 LOC)api/errors.py
async def model_load_error_handler(request: Request, exc: ModelLoadError) -> JSONResponse:
"""Handle ModelLoadError with service availability information.
Args:
request: The FastAPI request object.
exc: The model load error that was raised.
Returns:
JSONResponse with 503 status.
"""
response_body = build_error_response(exc)
logger.error(
"Model load failed for %s %s: %s",
request.method,
request.url.path,
exc.message,
exc_info=exc.cause if exc.cause else exc,
)
return JSONResponse(
status_code=503,
content=response_body,
headers={"Retry-After": "30"}, # Suggest retry after 30 seconds
)resource_error_handler function · python · L263-L286 (24 LOC)api/errors.py
async def resource_error_handler(request: Request, exc: ResourceError) -> JSONResponse:
"""Handle ResourceError with service availability information.
Args:
request: The FastAPI request object.
exc: The resource error that was raised.
Returns:
JSONResponse with 503 status.
"""
response_body = build_error_response(exc)
logger.error(
"Resource error for %s %s: %s",
request.method,
request.url.path,
exc.message,
)
return JSONResponse(
status_code=503,
content=response_body,
headers={"Retry-After": "60"}, # Suggest retry after 60 seconds
)timeout_error_handler function · python · L289-L318 (30 LOC)api/errors.py
async def timeout_error_handler(request: Request, exc: TimeoutError) -> JSONResponse:
"""Handle timeout errors with 408 Request Timeout.
This handler catches asyncio.TimeoutError and similar timeout exceptions
and returns an appropriate 408 response.
Args:
request: The FastAPI request object.
exc: The timeout error that was raised.
Returns:
JSONResponse with 408 status and retry information.
"""
logger.warning(
"Request timeout for %s %s",
request.method,
request.url.path,
)
response_body = {
"error": "RequestTimeout",
"code": "REQUEST_TIMEOUT",
"detail": "The request timed out. Please try again.",
}
return JSONResponse(
status_code=408,
content=response_body,
headers={"Retry-After": "5"}, # Suggest retry after 5 seconds
)Open data scored by Repobility · https://repobility.com
generic_exception_handler function · python · L321-L350 (30 LOC)api/errors.py
async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""Handle unexpected exceptions with a generic error response.
This is a catch-all handler for exceptions that aren't JarvisError
subclasses. It logs the full exception and returns a safe error message.
Args:
request: The FastAPI request object.
exc: The unexpected exception.
Returns:
JSONResponse with 500 status and generic error message.
"""
logger.exception(
"Unexpected error handling %s %s: %s",
request.method,
request.url.path,
str(exc),
)
response_body = {
"error": "InternalError",
"code": "INTERNAL_ERROR",
"detail": "An unexpected error occurred. Please try again later.",
}
return JSONResponse(
status_code=500,
content=response_body,
)register_exception_handlers function · python · L353-L382 (30 LOC)api/errors.py
def register_exception_handlers(app: FastAPI) -> None:
"""Register all JARVIS exception handlers with a FastAPI app.
This function should be called once when setting up the FastAPI app
to ensure all JARVIS errors are handled consistently.
Args:
app: The FastAPI application instance.
Example:
app = FastAPI()
register_exception_handlers(app)
"""
# Register specific handlers first (most specific to least specific)
# Note: type: ignore is needed due to Starlette's overly strict typing
app.add_exception_handler(ValidationError, validation_error_handler) # type: ignore[arg-type]
app.add_exception_handler(iMessageAccessError, imessage_access_error_handler) # type: ignore[arg-type]
app.add_exception_handler(ModelLoadError, model_load_error_handler) # type: ignore[arg-type]
app.add_exception_handler(ResourceError, resource_error_handler) # type: ignore[arg-type]
# Register the base JarvisError handler (catches alifespan function · python · L90-L126 (37 LOC)api/main.py
async def lifespan(app_instance: FastAPI) -> AsyncIterator[None]:
"""Lifecycle event handler for the FastAPI application."""
# Start model warmer
import asyncio
from jarvis.interfaces.desktop.server import JarvisSocketServer
from jarvis.model_warmer import get_model_warmer
from jarvis.tasks.worker import start_worker, stop_worker
# Initialize socket server (direct desktop communication)
# We run it in the same process to share the GPU memory pool
socket_server = JarvisSocketServer(
enable_watcher=True,
preload_models=False, # API already warms models
enable_prefetch=True,
)
# Start socket server in background
socket_task = asyncio.create_task(socket_server.serve_forever())
app_instance.state.socket_server = socket_server
get_model_warmer().start()
start_worker()
yield
# Stop services
stop_worker()
get_model_warmer().stop()
# Stop socket server
await socket_server.stop()
_create_openapi_generator function · python · L262-L303 (42 LOC)api/main.py
def _create_openapi_generator(app_instance: FastAPI) -> callable: # type: ignore[valid-type]
"""Create a custom OpenAPI schema generator for the given app instance."""
def custom_openapi() -> dict[str, Any]:
"""Generate custom OpenAPI schema with enhanced documentation."""
if app_instance.openapi_schema:
return app_instance.openapi_schema
openapi_schema = get_openapi(
title=API_TITLE,
version=API_VERSION,
description=API_DESCRIPTION,
routes=app_instance.routes,
tags=API_TAGS_METADATA,
)
# Add contact and license info
openapi_schema["info"]["contact"] = API_CONTACT
openapi_schema["info"]["license"] = API_LICENSE
# Add server information
openapi_schema["servers"] = [
{
"url": "http://localhost:8742",
"description": "Local development server",
},
{
"ur_configure_middleware function · python · L306-L354 (49 LOC)api/main.py
def _configure_middleware(app_instance: FastAPI) -> None:
"""Configure middleware for the FastAPI application."""
# Configure CORS for Tauri and development
app_instance.add_middleware(
CORSMiddleware,
allow_origins=[
"tauri://localhost", # Tauri production
"http://localhost:5173", # Vite dev server
"http://localhost:1420", # Tauri dev server
"http://127.0.0.1:5173",
"http://127.0.0.1:1420",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enable GZip compression for responses (50-75% bandwidth savings for large JSON responses)
app_instance.add_middleware(GZipMiddleware, minimum_size=500)
# Rate limiting middleware (applies default_limits to all routes)
app_instance.add_middleware(SlowAPIMiddleware)
# Request timing middleware
@app_instance.middleware("http")
async def metrics_middleware(request: Request, c_register_routers function · python · L357-L418 (62 LOC)api/main.py
def _register_routers(app_instance: FastAPI) -> None:
"""Register all API routers explicitly.
Imports routers on-demand to avoid eager loading at module import time.
This improves testability and reduces import-time coupling.
"""
# Import routers locally to avoid module-level coupling
from api.routers.analytics import router as analytics_router
from api.routers.attachments import router as attachments_router
from api.routers.batch import router as batch_router
from api.routers.calendar import router as calendar_router
from api.routers.contacts import router as contacts_router
from api.routers.conversations import router as conversations_router
from api.routers.custom_templates import router as custom_templates_router
from api.routers.drafts import router as drafts_router
# from api.routers.embeddings import router as embeddings_router # Missing
from api.routers.experiments import router as experiments_router
from api.rocreate_app function · python · L421-L462 (42 LOC)api/main.py
def create_app() -> FastAPI:
"""Application factory for creating configured FastAPI instances.
This factory pattern allows:
- Explicit router registration without import-time coupling
- Easy creation of test apps with subset of routers
- Better testability and modularity
Returns:
Configured FastAPI application instance
"""
# Create FastAPI app
app_instance = FastAPI(
title=API_TITLE,
description=API_DESCRIPTION,
version=API_VERSION,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
openapi_tags=API_TAGS_METADATA,
contact=API_CONTACT,
license_info=API_LICENSE,
lifespan=lifespan,
)
# Configure rate limiting
app_instance.state.limiter = limiter
app_instance.add_exception_handler(RateLimitExceeded, rate_limit_exceeded_handler) # type: ignore[arg-type]
# Use custom OpenAPI schema
app_instance.openapi = _create_openapi_gener_get_cached_config_value function · python · L44-L67 (24 LOC)api/ratelimit.py
def _get_cached_config_value(key: str, getter: Callable[[], Any], default: Any) -> Any:
"""Get config value with caching.
Args:
key: Cache key
getter: Function to get the value if cache miss
default: Default value if getter fails
Returns:
Config value
"""
now = time.time()
if key in _config_cache:
value, timestamp = _config_cache[key]
if now - timestamp < _CONFIG_CACHE_TTL:
return value
try:
value = getter()
_config_cache[key] = (value, now)
return value
except Exception:
logger.debug("Config lookup failed for %s, using default", key)
return defaultRepobility — same analyzer, your code, free for public repos · /scan/
get_remote_address function · python · L70-L91 (22 LOC)api/ratelimit.py
def get_remote_address(request: Request) -> str:
"""Get client identifier for rate limiting.
For local-first apps, we use a combination of IP and user-agent
to differentiate between different local clients.
Args:
request: The FastAPI request object.
Returns:
String identifier for the client.
"""
# Use slowapi's default for IP
ip = _get_remote_address(request) or "unknown"
# For localhost, add user-agent to differentiate clients
if ip in ("127.0.0.1", "localhost", "::1"):
user_agent = request.headers.get("user-agent", "unknown")
# Hash the user agent for privacy
return f"{ip}:{hash(user_agent) % 10000}"
return iprate_limit_exceeded_handler function · python · L109-L156 (48 LOC)api/ratelimit.py
def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded) -> Response:
"""Handle rate limit exceeded errors with proper 429 response.
Args:
request: The FastAPI request object.
exc: The rate limit exception.
Returns:
JSON response with 429 status and retry-after header.
"""
from fastapi.responses import JSONResponse
# Extract retry-after from the exception if available
retry_after = 60 # Default to 60 seconds
if hasattr(exc, "detail") and exc.detail:
# Try to parse the limit from the detail message
try:
# Detail format: "Rate limit exceeded: X per Y minute"
parts = str(exc.detail).split()
for i, part in enumerate(parts):
if part.isdigit() and i + 1 < len(parts):
if "minute" in parts[i + 1]:
retry_after = 60
elif "second" in parts[i + 1]:
retry_after = int(pawith_timeout function · python · L159-L192 (34 LOC)api/ratelimit.py
def with_timeout(timeout_seconds: float) -> Callable[[F], F]:
"""Decorator to add timeout to async endpoints.
Args:
timeout_seconds: Maximum time in seconds before timeout.
Returns:
Decorator function.
Usage:
@with_timeout(30.0)
async def my_endpoint():
...
"""
def decorator(func: F) -> F:
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await asyncio.wait_for(
func(*args, **kwargs),
timeout=timeout_seconds,
)
except TimeoutError:
from fastapi import HTTPException
raise HTTPException(
status_code=408,
detail=f"Request timed out after {timeout_seconds} seconds",
) from None
return wrapper # type: ignore[return-value]
return decoratorrun_in_threadpool function · python · L195-L221 (27 LOC)api/ratelimit.py
def run_in_threadpool(func: Callable[..., Any]) -> Callable[..., Awaitable[Any]]:
"""Decorator to run a synchronous function in a thread pool.
Use this for CPU-bound operations like model generation to avoid
blocking the event loop.
Args:
func: Synchronous function to wrap.
Returns:
Async function that runs the original in a thread pool.
Usage:
@run_in_threadpool
def cpu_intensive_work():
...
async def endpoint():
result = await cpu_intensive_work()
"""
from starlette.concurrency import run_in_threadpool as starlette_threadpool
@wraps(func)
async def wrapper(*args: Any, **kwargs: Any) -> Any:
return await starlette_threadpool(func, *args, **kwargs)
return wrapperget_timeout_generation function · python · L224-L238 (15 LOC)api/ratelimit.py
def get_timeout_generation() -> float:
"""Get generation timeout from config.
Uses caching to avoid reading config file on every call.
Returns:
Timeout in seconds for generation operations.
"""
return float(
_get_cached_config_value(
"timeout_generation",
lambda: get_config().rate_limit.generation_timeout_seconds,
30.0, # Default fallback
)
)get_timeout_read function · python · L241-L255 (15 LOC)api/ratelimit.py
def get_timeout_read() -> float:
"""Get read timeout from config.
Uses caching to avoid reading config file on every call.
Returns:
Timeout in seconds for read operations.
"""
return float(
_get_cached_config_value(
"timeout_read",
lambda: get_config().rate_limit.read_timeout_seconds,
10.0, # Default fallback
)
)get_analytics_cache function · python · L91-L101 (11 LOC)api/routers/analytics.py
def get_analytics_cache() -> TTLCache:
"""Get analytics cache with 5-minute TTL.
Uses double-checked locking pattern for thread safety.
"""
global _analytics_cache
if _analytics_cache is None:
with _analytics_cache_lock:
if _analytics_cache is None:
_analytics_cache = TTLCache(ttl_seconds=300.0, maxsize=100)
return _analytics_cache_get_time_range_start function · python · L104-L113 (10 LOC)api/routers/analytics.py
def _get_time_range_start(time_range: TimeRangeEnum) -> datetime | None:
"""Get start datetime for time range."""
now = datetime.now(UTC)
if time_range == TimeRangeEnum.WEEK:
return now - timedelta(days=7)
elif time_range == TimeRangeEnum.MONTH:
return now - timedelta(days=30)
elif time_range == TimeRangeEnum.THREE_MONTHS:
return now - timedelta(days=90)
return None # all_timeGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_analytics_overview function · python · L122-L173 (52 LOC)api/routers/analytics.py
async def get_analytics_overview(
request: Request,
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for analytics",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get comprehensive overview metrics for the analytics dashboard.
Returns aggregated statistics including:
- Total messages (sent/received)
- Active conversations count
- Average messages per day
- Average response time
- Overall sentiment
- Peak activity times
- Period comparison (vs previous period)
**Example Response:**
```json
{
"total_messages": 5000,
"sent_messages": 2500,
"received_messages": 2500,
"active_conversations": 45,
"avg_messages_per_day": 50.5,
"avg_response_time_minutes": 12.5,
"sentiment": {"score": 0.35, "label": "positive"},
"peak_hour": 14,
"peak_day": "Wednesday",
"period_cget_analytics_timeline function · python · L182-L241 (60 LOC)api/routers/analytics.py
async def get_analytics_timeline(
request: Request,
granularity: str = Query(
default="day",
pattern="^(hour|day|week|month)$",
description="Time granularity (hour, day, week, month)",
),
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for data",
),
metric: str = Query(
default="messages",
pattern="^(messages|sentiment|response_time)$",
description="Metric to track",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get time-series data for chart visualization.
Supports multiple granularities and metrics for flexible charting.
**Granularities:**
- `hour`: Hourly breakdown (24 data points)
- `day`: Daily counts
- `week`: Weekly aggregates
- `month`: Monthly aggregates
**Metrics:**
- `messages`: Message counts (total, sent, received)
- `sentiment`: Average sentiment scores
- `reget_activity_heatmap function · python · L250-L297 (48 LOC)api/routers/analytics.py
async def get_activity_heatmap(
request: Request,
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.THREE_MONTHS,
description="Time range for heatmap",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get GitHub-style activity heatmap data.
Returns daily activity levels for calendar visualization.
**Activity Levels:**
- 0: No activity
- 1: Low (1-5 messages)
- 2: Medium (6-15 messages)
- 3: High (16-30 messages)
- 4: Very high (31+ messages)
**Example Response:**
```json
{
"data": [
{"date": "2024-01-15", "count": 45, "level": 4},
{"date": "2024-01-16", "count": 12, "level": 2}
],
"stats": {
"total_days": 90,
"active_days": 75,
"max_count": 85,
"avg_count": 25.5
}
}
```
"""
cache_key = f"heatmap:{time_range.value}"
cache = get_analytics_cache()
get_contact_stats function · python · L306-L362 (57 LOC)api/routers/analytics.py
async def get_contact_stats(
request: Request,
chat_id: str,
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for statistics",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get detailed analytics for a specific contact.
Returns comprehensive statistics including:
- Message counts and balance
- Response time metrics
- Sentiment analysis
- Activity patterns
- Engagement score
**Example Response:**
```json
{
"contact_id": "chat123456",
"contact_name": "John Doe",
"total_messages": 500,
"sent_count": 245,
"received_count": 255,
"avg_response_time_minutes": 8.5,
"sentiment_score": 0.42,
"engagement_score": 78.5,
"message_trend": "increasing",
"hourly_distribution": {...},
"daily_distribution": {...}
}
```
"""
cache_key = f"contact:{chat_id}get_contacts_leaderboard function · python · L371-L428 (58 LOC)api/routers/analytics.py
async def get_contacts_leaderboard(
request: Request,
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for ranking",
),
limit: int = Query(
default=10,
ge=1,
le=50,
description="Number of contacts to return",
),
sort_by: str = Query(
default="messages",
pattern="^(messages|engagement|response_time)$",
description="Sort criteria",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get ranked list of top contacts by various criteria.
**Sort Options:**
- `messages`: By total message count
- `engagement`: By engagement score
- `response_time`: By average response time (fastest first)
**Example Response:**
```json
{
"contacts": [
{
"rank": 1,
"contact_id": "chat123",
"contact_name": "John Doe",
"total_messget_trending_patterns function · python · L437-L492 (56 LOC)api/routers/analytics.py
async def get_trending_patterns(
request: Request,
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for trend analysis",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> dict[str, Any]:
"""Get detected trends, anomalies, and patterns.
Returns:
- Overall activity trend
- Trending contacts (significant changes)
- Detected anomalies (unusual activity)
- Seasonality patterns
**Example Response:**
```json
{
"overall_trend": {
"direction": "increasing",
"percentage_change": 25.5,
"confidence": 0.85
},
"trending_contacts": [
{
"contact_id": "chat123",
"contact_name": "John",
"trend": "increasing",
"change_percent": 45.5
}
],
"anomalies": [
{
"date": "2024-01-20",
"type"export_analytics function · python · L501-L532 (32 LOC)api/routers/analytics.py
async def export_analytics(
request: Request,
format: str = Query(
default="json",
pattern="^(json|csv)$",
description="Export format",
),
time_range: TimeRangeEnum = Query(
default=TimeRangeEnum.MONTH,
description="Time range for export",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> Response:
"""Export analytics data in CSV or JSON format.
**Formats:**
- `json`: Complete analytics as JSON
- `csv`: Daily/weekly/monthly aggregates as CSV files (zip archive)
Returns the file content with appropriate headers for download.
"""
time_range_start = _get_time_range_start(time_range)
content, media_type, filename = await run_in_threadpool(
fetch_export_data, reader, format, time_range, time_range_start
)
return Response(
content=content,
media_type=media_type,
headers={"Content-Disposition": f"attachment; filename={filename}"},
)clear_analytics_cache function · python · L541-L550 (10 LOC)api/routers/analytics.py
async def clear_analytics_cache(request: Request) -> dict[str, str]:
"""Clear the analytics cache to force fresh computation.
Use this after significant data changes or when troubleshooting
stale data issues.
"""
global _analytics_cache
with _analytics_cache_lock:
_analytics_cache = None
return {"status": "ok", "message": "Analytics cache cleared"}Repobility · code-quality intelligence platform · https://repobility.com
get_category_averages function · python · L581-L588 (8 LOC)api/routers/analytics.py
def get_category_averages() -> list[CategoryAverageItem]:
"""Get average similarity scores per template category."""
analytics = get_template_analytics()
averages = analytics.get_category_averages()
return [
CategoryAverageItem(category=cat, average_similarity=round(avg, 4))
for cat, avg in sorted(averages.items(), key=lambda x: x[1], reverse=True)
]list_available_templates function · python · L592-L602 (11 LOC)api/routers/analytics.py
def list_available_templates() -> list[TemplateInfo]:
"""List all available templates."""
templates = _load_templates()
return [
TemplateInfo(
name=t.name,
pattern_count=len(t.patterns),
sample_patterns=t.patterns[:3],
)
for t in templates
]get_template_coverage function · python · L606-L624 (19 LOC)api/routers/analytics.py
def get_template_coverage() -> dict[str, Any]:
"""Get overall template coverage statistics."""
analytics = get_template_analytics()
stats = analytics.get_stats()
templates = _load_templates()
total_patterns = sum(len(t.patterns) for t in templates)
return {
"total_templates": len(templates),
"total_patterns": total_patterns,
"responses_from_templates": stats["template_hits"],
"responses_from_model": stats["model_fallbacks"],
"coverage_percent": stats["hit_rate_percent"],
"template_efficiency": (
round(stats["template_hits"] / len(templates), 2)
if len(templates) > 0 and stats["template_hits"] > 0
else 0.0
),
}export_raw_template_analytics function · python · L628-L635 (8 LOC)api/routers/analytics.py
def export_raw_template_analytics() -> JSONResponse:
"""Export raw template analytics data as JSON."""
analytics = get_template_analytics()
raw_data = analytics.export_raw()
return JSONResponse(
content=raw_data,
headers={"Content-Disposition": "attachment; filename=template_analytics.json"},
)get_template_dashboard_data function · python · L647-L672 (26 LOC)api/routers/analytics.py
def get_template_dashboard_data() -> dict[str, Any]:
"""Get all data needed for the template analytics dashboard."""
analytics = get_template_analytics()
stats = analytics.get_stats()
templates = _load_templates()
return {
"summary": stats,
"top_templates": analytics.get_top_templates(limit=20),
"missed_queries": analytics.get_missed_queries(limit=20),
"category_averages": [
{"category": cat, "average_similarity": round(avg, 4)}
for cat, avg in analytics.get_category_averages().items()
],
"coverage": {
"total_templates": len(templates),
"total_patterns": sum(len(t.patterns) for t in templates),
"responses_from_templates": stats["template_hits"],
"responses_from_model": stats["model_fallbacks"],
"coverage_percent": stats["hit_rate_percent"],
},
"pie_chart_data": {
"template_responses": stats["template_hits"],format_bytes function · python · L52-L68 (17 LOC)api/routers/attachments.py
def format_bytes(size_bytes: int) -> str:
"""Format bytes as human-readable string.
Args:
size_bytes: Size in bytes
Returns:
Formatted string like "1.5 MB" or "2.3 GB"
"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"_generate_thumbnail_with_sips function · python · L71-L110 (40 LOC)api/routers/attachments.py
def _generate_thumbnail_with_sips(source_path: Path, max_pixels: int = 1024) -> Path | None:
"""Generate a JPEG thumbnail using macOS sips and cache it in temp dir."""
try:
stat = source_path.stat()
cache_key = hashlib.sha256(
f"{source_path}:{stat.st_mtime_ns}:{stat.st_size}".encode()
).hexdigest()[:20]
cache_dir = Path(tempfile.gettempdir()) / "jarvis-attachment-thumbnails"
cache_dir.mkdir(parents=True, exist_ok=True)
thumb_path = cache_dir / f"{cache_key}.jpg"
if thumb_path.exists() and thumb_path.stat().st_size > 0:
return thumb_path
result = subprocess.run(
[
"sips",
"-s",
"format",
"jpeg",
"-Z",
str(max_pixels),
str(source_path),
"--out",
str(thumb_path),
],
capture_output=True,
text=True,
list_attachments function · python · L149-L235 (87 LOC)api/routers/attachments.py
async def list_attachments(
chat_id: str | None = Query(
default=None,
description="Filter by conversation ID",
examples=["chat123456789"],
),
attachment_type: AttachmentTypeEnum = Query(
default=AttachmentTypeEnum.ALL,
description="Filter by attachment type",
),
after: datetime | None = Query(
default=None,
description="Only attachments after this date",
examples=["2024-01-01T00:00:00Z"],
),
before: datetime | None = Query(
default=None,
description="Only attachments before this date",
examples=["2024-12-31T23:59:59Z"],
),
limit: int = Query(
default=100,
ge=1,
le=500,
description="Maximum number of attachments to return",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> list[AttachmentWithContextResponse]:
"""List attachments with optional filtering.
Returns attachments sorted by date (newest first), wOpen data scored by Repobility · https://repobility.com
get_attachment_stats function · python · L266-L289 (24 LOC)api/routers/attachments.py
def get_attachment_stats(
chat_id: str,
reader: ChatDBReader = Depends(get_imessage_reader),
) -> AttachmentStatsResponse:
"""Get attachment statistics for a specific conversation.
Returns the total count and size of attachments, broken down by type
(images, videos, audio, documents, other).
**Response includes:**
- Total attachment count
- Total size in bytes and human-readable format
- Breakdown by type (count and size)
"""
stats = reader.get_attachment_stats(chat_id)
return AttachmentStatsResponse(
chat_id=chat_id,
total_count=stats["total_count"],
total_size_bytes=stats["total_size_bytes"],
total_size_formatted=format_bytes(stats["total_size_bytes"]),
by_type=stats["by_type"],
size_by_type=stats["size_by_type"],
)get_storage_summary function · python · L326-L370 (45 LOC)api/routers/attachments.py
def get_storage_summary(
limit: int = Query(
default=50,
ge=1,
le=200,
description="Maximum number of conversations to return",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> StorageSummaryResponse:
"""Get storage usage breakdown by conversation.
Returns a summary of attachment storage across all conversations,
sorted by total size (largest first).
**Use cases:**
- Identify conversations taking up the most space
- Get total iMessage attachment storage usage
- Plan storage cleanup
"""
storage_data = reader.get_storage_by_conversation(limit=limit)
total_attachments = 0
total_size = 0
by_conversation = []
for item in storage_data:
total_attachments += item["attachment_count"]
total_size += item["total_size_bytes"]
by_conversation.append(
StorageByConversationResponse(
chat_id=item["chat_id"],
display_name=iteget_thumbnail function · python · L393-L466 (74 LOC)api/routers/attachments.py
def get_thumbnail(
file_path: str = Query(
...,
description="Path to the original attachment file",
),
reader: ChatDBReader = Depends(get_imessage_reader),
) -> FileResponse:
"""Get a thumbnail for an attachment.
Attempts to find and return a thumbnail for the specified attachment.
If no thumbnail exists, returns a 404 error.
**Note:** Only image and video attachments typically have thumbnails.
"""
# Expand tilde in path
if file_path.startswith("~"):
file_path = str(Path(file_path).expanduser())
# Security check: ensure the path is within the expected attachments directory
attachments_base = Path.home() / "Library" / "Messages" / "Attachments"
try:
resolved_path = Path(file_path).resolve()
resolved_base = attachments_base.resolve()
# Use relative_to to ensure path is within base directory
resolved_path.relative_to(resolved_base)
except ValueError as e:
raise HTTPExpage 1 / 20next ›