← back to filthyrake__vlog

Function bodies 1,000 total

All specs Real LLM only Function bodies
get_video_sprite_status function · python · L9816-L9836 (21 LOC)
api/admin.py
async def get_video_sprite_status(request: Request, video_id: int) -> SpriteStatusResponse:
    """
    Get sprite sheet status for a specific video.
    """
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == video_id).where(videos.c.deleted_at.is_(None))
    )

    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    return SpriteStatusResponse(
        video_id=video_id,
        status=video["sprite_sheet_status"],
        error=video["sprite_sheet_error"],
        count=video["sprite_sheet_count"] or 0,
        interval=video["sprite_sheet_interval"],
        tile_size=video["sprite_sheet_tile_size"],
        frame_width=video["sprite_sheet_frame_width"],
        frame_height=video["sprite_sheet_frame_height"],
    )
cancel_sprite_job function · python · L9841-L9870 (30 LOC)
api/admin.py
async def cancel_sprite_job(request: Request, job_id: int) -> dict:
    """
    Cancel a pending sprite generation job.

    Only pending jobs can be cancelled.
    """
    # Check current status
    job = await fetch_one_with_retry(
        sa.text("SELECT id, video_id, status FROM sprite_queue WHERE id = :id").bindparams(id=job_id)
    )

    if not job:
        raise HTTPException(status_code=404, detail="Job not found")

    if job["status"] != "pending":
        raise HTTPException(status_code=400, detail=f"Cannot cancel job with status '{job['status']}'")

    await db_execute_with_retry(sprite_queue.update().where(sprite_queue.c.id == job_id).values(status="cancelled"))

    # Reset video sprite status
    await db_execute_with_retry(
        videos.update()
        .where(videos.c.id == job["video_id"])
        .values(
            sprite_sheet_status=None,
            sprite_sheet_error=None,
        )
    )

    return {"status": "ok", "message": f"Job {job_id} cancelled"}
get_dead_letter_queue function · python · L9880-L9965 (86 LOC)
api/admin.py
async def get_dead_letter_queue(
    request: Request,
    limit: int = Query(default=50, ge=1, le=200, description="Number of entries to return"),
    offset: int = Query(default=0, ge=0, description="Offset for pagination"),
):
    """
    Get dead letter queue entries with monitoring information.

    Returns failed jobs that were moved to the DLQ, including error details
    and timing information for debugging and reprocessing.

    Response includes DLQ depth for alerting thresholds (e.g., >10 jobs).
    """
    from api.redis_client import get_redis

    redis = await get_redis()
    if not redis:
        return {
            "available": False,
            "message": "Redis not available",
            "entries": [],
            "total_count": 0,
            "depth_warning": False,
        }

    try:
        from api.job_queue import DEAD_LETTER_STREAM

        # Get total count for depth monitoring
        dlq_length = await redis.xlen(DEAD_LETTER_STREAM)

        # Read entri
reprocess_dead_letter_job function · python · L9970-L10097 (128 LOC)
api/admin.py
async def reprocess_dead_letter_job(
    request: Request,
    message_id: str = FastAPIPath(..., pattern=r"^\d+-\d+$", description="Redis stream message ID"),
    priority: str = Query(default="normal", pattern="^(high|normal|low)$"),
):
    """
    Reprocess a job from the dead letter queue.

    Moves the job back to the appropriate priority stream for reprocessing.
    The original DLQ entry is removed after successful requeue.

    Args:
        message_id: The Redis stream message ID from the DLQ
        priority: Priority for the requeued job (defaults to original or 'normal')
    """
    from api.job_queue import DEAD_LETTER_STREAM, PRIORITY_STREAMS
    from api.redis_client import get_redis

    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available")

    try:
        # Read the specific message from DLQ
        entries = await redis.xrange(
            DEAD_LETTER_STREAM,
            min=message_id,
            ma
delete_dead_letter_entry function · python · L10102-L10146 (45 LOC)
api/admin.py
async def delete_dead_letter_entry(
    request: Request,
    message_id: str = FastAPIPath(..., pattern=r"^\d+-\d+$", description="Redis stream message ID"),
):
    """
    Delete a specific entry from the dead letter queue.

    Use this when a job should not be retried (e.g., intentionally cancelled,
    obsolete, or already manually resolved).
    """
    from api.job_queue import DEAD_LETTER_STREAM
    from api.redis_client import get_redis

    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available")

    try:
        # Verify the message exists
        entries = await redis.xrange(
            DEAD_LETTER_STREAM,
            min=message_id,
            max=message_id,
            count=1,
        )

        if not entries:
            raise HTTPException(status_code=404, detail=f"Message {message_id} not found in DLQ")

        # Delete the message
        deleted = await redis.xdel(DEAD_LETTER_STREAM, message_id)

  
get_job_queue_stats function · python · L10151-L10167 (17 LOC)
api/admin.py
async def get_job_queue_stats(request: Request):
    """
    Get job queue statistics including DLQ depth.

    Returns comprehensive queue statistics for monitoring dashboards.
    Includes per-priority stream lengths and DLQ depth for alerting.
    """
    job_queue = await get_job_queue()
    stats = await job_queue.get_queue_stats()

    # Add DLQ warning threshold info
    dlq_depth = stats.get("dead_letter_queue", 0)
    depth_warning_threshold = 10
    stats["dlq_warning"] = dlq_depth > depth_warning_threshold
    stats["dlq_warning_threshold"] = depth_warning_threshold

    return stats
list_webhooks function · python · L10175-L10231 (57 LOC)
api/admin.py
async def list_webhooks(
    request: Request,
    limit: int = Query(default=50, ge=1, le=100, description="Maximum webhooks to return"),
    offset: int = Query(default=0, ge=0, description="Number of webhooks to skip"),
    active_only: bool = Query(default=False, description="Only return active webhooks"),
) -> WebhookListResponse:
    """
    List all configured webhooks with pagination.

    Returns webhooks with their configuration and delivery statistics.
    Secrets are never included in responses.

    Query Parameters:
    - limit: Maximum webhooks to return (default 50, max 100)
    - offset: Number of webhooks to skip (default 0)
    - active_only: Filter to only active webhooks (default false)
    """
    # Build query with optional filter
    base_query = webhooks.select()
    if active_only:
        base_query = base_query.where(webhooks.c.active == True)  # noqa: E712

    # Get total count for pagination
    count_query = sa.select(sa.func.count()).select_from(webhook
Repobility · severity-and-effort ranking · https://repobility.com
get_webhook_stats function · python · L10236-L10280 (45 LOC)
api/admin.py
async def get_webhook_stats(request: Request) -> WebhookStatsResponse:
    """
    Get webhook system statistics.

    Returns overview statistics including total webhooks,
    pending/failed deliveries, and 24-hour delivery counts.
    """
    now = datetime.now(timezone.utc)
    yesterday = now - timedelta(days=1)

    # Count webhooks
    total_webhooks = await fetch_val_with_retry(sa.select(sa.func.count()).select_from(webhooks))
    active_webhooks = await fetch_val_with_retry(
        sa.select(sa.func.count()).select_from(webhooks).where(webhooks.c.active == True)  # noqa: E712
    )

    # Count deliveries
    pending_deliveries = await fetch_val_with_retry(
        sa.select(sa.func.count()).select_from(webhook_deliveries).where(webhook_deliveries.c.status == "pending")
    )
    failed_deliveries = await fetch_val_with_retry(
        sa.select(sa.func.count())
        .select_from(webhook_deliveries)
        .where(webhook_deliveries.c.status == "failed_permanent")
    )

   
create_webhook function · python · L10285-L10340 (56 LOC)
api/admin.py
async def create_webhook(request: Request, data: WebhookCreate) -> WebhookResponse:
    """
    Create a new webhook subscription.

    Configure a new webhook to receive event notifications.
    Provide a list of event types to subscribe to.
    """
    now = datetime.now(timezone.utc)

    # Prepare headers JSON
    headers_json = json.dumps(data.headers) if data.headers else None

    try:
        result = await database.execute(
            webhooks.insert().values(
                name=data.name,
                url=data.url,
                events=json.dumps(data.events),
                secret=data.secret,
                active=data.active,
                headers=headers_json,
                created_at=now,
                total_deliveries=0,
                successful_deliveries=0,
                failed_deliveries=0,
            )
        )

        webhook_id = result

        await log_audit(
            action=AuditAction.WEBHOOK_CREATE,
            entity_type="webhook"
get_webhook function · python · L10345-L10374 (30 LOC)
api/admin.py
async def get_webhook(request: Request, webhook_id: int) -> WebhookResponse:
    """
    Get a specific webhook by ID.

    Returns the webhook configuration and delivery statistics.
    """
    row = await fetch_one_with_retry(webhooks.select().where(webhooks.c.id == webhook_id))

    if not row:
        raise HTTPException(status_code=404, detail="Webhook not found")

    try:
        events = json.loads(row["events"]) if row["events"] else []
    except (json.JSONDecodeError, TypeError):
        events = []

    return WebhookResponse(
        id=row["id"],
        name=row["name"],
        url=row["url"],
        events=events,
        active=row["active"],
        created_at=row["created_at"],
        updated_at=row["updated_at"],
        last_triggered_at=row["last_triggered_at"],
        total_deliveries=row["total_deliveries"] or 0,
        successful_deliveries=row["successful_deliveries"] or 0,
        failed_deliveries=row["failed_deliveries"] or 0,
        has_secret=bool(r
update_webhook function · python · L10379-L10445 (67 LOC)
api/admin.py
async def update_webhook(request: Request, webhook_id: int, data: WebhookUpdate) -> WebhookResponse:
    """
    Update an existing webhook.

    Only provided fields will be updated.
    Set secret to empty string to remove the signing key.
    """
    row = await fetch_one_with_retry(webhooks.select().where(webhooks.c.id == webhook_id))

    if not row:
        raise HTTPException(status_code=404, detail="Webhook not found")

    now = datetime.now(timezone.utc)
    update_values = {"updated_at": now}

    if data.name is not None:
        update_values["name"] = data.name
    if data.url is not None:
        update_values["url"] = data.url
    if data.events is not None:
        update_values["events"] = json.dumps(data.events)
    if data.secret is not None:
        update_values["secret"] = data.secret if data.secret else None
    if data.active is not None:
        update_values["active"] = data.active
    if data.headers is not None:
        update_values["headers"] = json.dumps
delete_webhook function · python · L10450-L10480 (31 LOC)
api/admin.py
async def delete_webhook(request: Request, webhook_id: int):
    """
    Delete a webhook.

    Permanently removes the webhook and all associated delivery history.
    This action cannot be undone.
    """
    row = await fetch_one_with_retry(webhooks.select().where(webhooks.c.id == webhook_id))

    if not row:
        raise HTTPException(status_code=404, detail="Webhook not found")

    try:
        # Deliveries will be cascade-deleted by FK constraint
        await database.execute(webhooks.delete().where(webhooks.c.id == webhook_id))

        await log_audit(
            action=AuditAction.WEBHOOK_DELETE,
            entity_type="webhook",
            entity_id=webhook_id,
            details={"name": row["name"], "url": row["url"]},
        )

        return {"status": "ok", "message": f"Webhook '{row['name']}' deleted"}

    except Exception as e:
        logger.error(f"Failed to delete webhook {webhook_id}: {e}")
        raise HTTPException(
            status_code=500,
       
test_webhook function · python · L10485-L10511 (27 LOC)
api/admin.py
async def test_webhook(
    request: Request,
    webhook_id: int,
    data: WebhookTestRequest = WebhookTestRequest(),
) -> WebhookTestResponse:
    """
    Send a test webhook delivery.

    Sends a test payload to the webhook URL to verify configuration.
    The test payload includes a 'test': true flag.
    """
    from api.webhook_service import test_webhook as do_test_webhook

    row = await fetch_one_with_retry(webhooks.select().where(webhooks.c.id == webhook_id))

    if not row:
        raise HTTPException(status_code=404, detail="Webhook not found")

    result = await do_test_webhook(webhook_id, data.event_type)

    return WebhookTestResponse(
        success=result["success"],
        status_code=result.get("status_code"),
        response_body=result.get("response_body"),
        error_message=result.get("error_message"),
        duration_ms=result["duration_ms"],
    )
list_webhook_deliveries function · python · L10516-L10572 (57 LOC)
api/admin.py
async def list_webhook_deliveries(
    request: Request,
    webhook_id: int,
    status: Optional[str] = Query(None, pattern="^(pending|delivered|failed|failed_permanent)$"),
    limit: int = Query(50, ge=1, le=200),
    offset: int = Query(0, ge=0),
) -> WebhookDeliveryListResponse:
    """
    List delivery attempts for a webhook.

    Returns delivery history with filtering by status.
    """
    row = await fetch_one_with_retry(webhooks.select().where(webhooks.c.id == webhook_id))

    if not row:
        raise HTTPException(status_code=404, detail="Webhook not found")

    # Build query
    query = (
        webhook_deliveries.select()
        .where(webhook_deliveries.c.webhook_id == webhook_id)
        .order_by(webhook_deliveries.c.created_at.desc())
    )

    count_query = (
        sa.select(sa.func.count()).select_from(webhook_deliveries).where(webhook_deliveries.c.webhook_id == webhook_id)
    )

    if status:
        query = query.where(webhook_deliveries.c.status == st
get_webhook_delivery function · python · L10577-L10616 (40 LOC)
api/admin.py
async def get_webhook_delivery(
    request: Request,
    webhook_id: int,
    delivery_id: int,
) -> WebhookDeliveryDetailResponse:
    """
    Get detailed information about a specific delivery.

    Includes the full event data and request/response bodies.
    """
    row = await fetch_one_with_retry(
        webhook_deliveries.select()
        .where(webhook_deliveries.c.id == delivery_id)
        .where(webhook_deliveries.c.webhook_id == webhook_id)
    )

    if not row:
        raise HTTPException(status_code=404, detail="Delivery not found")

    try:
        event_data = json.loads(row["event_data"]) if row["event_data"] else None
    except (json.JSONDecodeError, TypeError):
        event_data = None

    return WebhookDeliveryDetailResponse(
        id=row["id"],
        webhook_id=row["webhook_id"],
        event_type=row["event_type"],
        status=row["status"],
        attempt_number=row["attempt_number"],
        response_status=row["response_status"],
        error_m
Repobility (the analyzer behind this table) · https://repobility.com
retry_webhook_delivery function · python · L10621-L10671 (51 LOC)
api/admin.py
async def retry_webhook_delivery(
    request: Request,
    webhook_id: int,
    delivery_id: int,
):
    """
    Retry a failed webhook delivery.

    Resets the delivery status to pending and schedules immediate retry.
    """
    row = await fetch_one_with_retry(
        webhook_deliveries.select()
        .where(webhook_deliveries.c.id == delivery_id)
        .where(webhook_deliveries.c.webhook_id == webhook_id)
    )

    if not row:
        raise HTTPException(status_code=404, detail="Delivery not found")

    if row["status"] == "delivered":
        raise HTTPException(status_code=400, detail="Cannot retry a delivered webhook")

    now = datetime.now(timezone.utc)

    try:
        await database.execute(
            webhook_deliveries.update()
            .where(webhook_deliveries.c.id == delivery_id)
            .values(
                status="pending",
                attempt_number=1,  # Reset attempts
                next_retry_at=now,
                error_message=None,
_parse_qualities_json function · python · L10680-L10687 (8 LOC)
api/admin.py
def _parse_qualities_json(qualities_str: Optional[str]) -> List[str]:
    """Parse qualities JSON string to list."""
    if not qualities_str:
        return []
    try:
        return json.loads(qualities_str)
    except json.JSONDecodeError:
        return []
_build_live_stream_response function · python · L10690-L10709 (20 LOC)
api/admin.py
def _build_live_stream_response(row: dict) -> LiveStreamResponse:
    """Build LiveStreamResponse from database row."""
    return LiveStreamResponse(
        id=row["id"],
        title=row["title"],
        slug=row["slug"],
        description=row["description"] or "",
        status=LiveStreamStatus(row["status"]),
        qualities=_parse_qualities_json(row["qualities"]),
        category_id=row["category_id"],
        dvr_enabled=row["dvr_enabled"],
        dvr_window_seconds=row["dvr_window_seconds"],
        auto_record_vod=row["auto_record_vod"],
        segment_count=row["segment_count"],
        vod_video_id=row["vod_video_id"],
        created_at=row["created_at"],
        started_at=row["started_at"],
        ended_at=row["ended_at"],
        last_segment_at=row["last_segment_at"],
    )
list_live_streams function · python · L10714-L10735 (22 LOC)
api/admin.py
async def list_live_streams(
    request: Request,
    status: Optional[str] = Query(None, description="Filter by status (idle, live, ending, ended)"),
) -> LiveStreamListResponse:
    """List all live streams."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    query = live_streams.select().order_by(live_streams.c.created_at.desc())

    if status:
        valid_statuses = ["idle", "live", "ending", "ended"]
        if status not in valid_statuses:
            raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}")
        query = query.where(live_streams.c.status == status)

    rows = await fetch_all_with_retry(query)

    return LiveStreamListResponse(
        streams=[_build_live_stream_response(dict(row)) for row in rows],
        total=len(rows),
    )
create_live_stream function · python · L10740-L10820 (81 LOC)
api/admin.py
async def create_live_stream(request: Request, data: LiveStreamCreate) -> LiveStreamCreatedResponse:
    """Create a new live stream. Returns the stream key (shown once)."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    # Check concurrent stream limit
    active_count = await database.fetch_val(
        sa.select(sa.func.count()).select_from(live_streams).where(
            live_streams.c.status.in_(["idle", "live", "ending"])
        )
    )
    if active_count >= LIVE_MAX_CONCURRENT_STREAMS:
        raise HTTPException(
            status_code=429,
            detail=f"Maximum concurrent streams ({LIVE_MAX_CONCURRENT_STREAMS}) reached",
        )

    # Generate slug
    slug = slugify(data.title)

    # Check for duplicate slug
    existing = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if existing:
        # Append random suffix
        slug = f"{slug}-{secrets.token_hex(4)
get_live_stream function · python · L10825-L10834 (10 LOC)
api/admin.py
async def get_live_stream(request: Request, slug: str) -> LiveStreamResponse:
    """Get details of a live stream."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if not row:
        raise HTTPException(status_code=404, detail="Stream not found")

    return _build_live_stream_response(dict(row))
update_live_stream function · python · L10839-L10882 (44 LOC)
api/admin.py
async def update_live_stream(request: Request, slug: str, data: LiveStreamUpdate) -> LiveStreamResponse:
    """Update a live stream."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if not row:
        raise HTTPException(status_code=404, detail="Stream not found")

    # Build update values
    update_values = {}
    if data.title is not None:
        update_values["title"] = data.title
    if data.description is not None:
        update_values["description"] = data.description
    if data.category_id is not None:
        update_values["category_id"] = data.category_id
    if data.dvr_enabled is not None:
        update_values["dvr_enabled"] = data.dvr_enabled
    if data.dvr_window_seconds is not None:
        update_values["dvr_window_seconds"] = data.dvr_window_seconds
    if data.auto_record_vod is not None:
        updat
regenerate_stream_key function · python · L10887-L10922 (36 LOC)
api/admin.py
async def regenerate_stream_key(request: Request, slug: str) -> LiveStreamKeyRegenerateResponse:
    """Regenerate the stream key for a live stream."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if not row:
        raise HTTPException(status_code=404, detail="Stream not found")

    # Cannot regenerate key for live stream
    if row["status"] == "live":
        raise HTTPException(status_code=400, detail="Cannot regenerate key while stream is live")

    # Generate new key
    new_key = generate_stream_key()
    new_hash = hash_stream_key(new_key)
    new_prefix = get_key_prefix(new_key)

    await db_execute_with_retry(
        live_streams.update()
        .where(live_streams.c.slug == slug)
        .values(stream_key_hash=new_hash, stream_key_prefix=new_prefix)
    )

    # Audit log
    log_audit(
        AuditAction.UPD
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
end_live_stream function · python · L10927-L10974 (48 LOC)
api/admin.py
async def end_live_stream(request: Request, slug: str) -> LiveStreamResponse:
    """End a live stream. This triggers VOD recording if enabled."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if not row:
        raise HTTPException(status_code=404, detail="Stream not found")

    if row["status"] == "ended":
        raise HTTPException(status_code=400, detail="Stream has already ended")

    # Revoke the stream key atomically
    await revoke_stream_key(row["id"])

    # Fetch updated row
    updated_row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))

    # Audit log
    log_audit(
        AuditAction.UPDATE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        resource_type="live_stream",
        resource_id=row["id"],
        resource_na
delete_live_stream function · python · L10979-L11020 (42 LOC)
api/admin.py
async def delete_live_stream(request: Request, slug: str):
    """Delete a live stream and its segments."""
    if not LIVE_ENABLED:
        raise HTTPException(status_code=503, detail="Live streaming is disabled")

    row = await fetch_one_with_retry(live_streams.select().where(live_streams.c.slug == slug))
    if not row:
        raise HTTPException(status_code=404, detail="Stream not found")

    # Cannot delete live stream
    if row["status"] == "live":
        raise HTTPException(status_code=400, detail="Cannot delete while stream is live. End it first.")

    stream_id = row["id"]

    # Delete segments from database (CASCADE should handle this, but be explicit)
    await db_execute_with_retry(
        live_stream_segments.delete().where(live_stream_segments.c.stream_id == stream_id)
    )

    # Delete stream from database
    await db_execute_with_retry(live_streams.delete().where(live_streams.c.id == stream_id))

    # Delete storage directory
    stream_dir = LIVE_STORAGE_P
get_comments_moderation_queue function · python · L11030-L11157 (128 LOC)
api/admin.py
async def get_comments_moderation_queue(
    request: Request,
    status: Optional[str] = Query(None, description="Filter by status: pending, approved, rejected, spam"),
    video_id: Optional[int] = Query(None, description="Filter by video ID"),
    limit: int = Query(50, ge=1, le=100),
    cursor: Optional[str] = Query(None),
) -> CommentModerationQueueResponse:
    """
    Get comments for moderation (admin only).

    Returns paginated list of comments, optionally filtered by status or video.
    Default shows all non-deleted comments, ordered by newest first.
    """
    # Build base query
    query = (
        sa.select(
            comments.c.id,
            comments.c.video_id,
            comments.c.user_id,
            comments.c.content,
            comments.c.status,
            comments.c.video_timestamp,
            comments.c.depth,
            comments.c.path,
            comments.c.parent_id,
            comments.c.created_at,
            comments.c.updated_at,
      
moderate_comment function · python · L11162-L11246 (85 LOC)
api/admin.py
async def moderate_comment(
    request: Request,
    comment_id: int,
    body: CommentModerate,
) -> CommentResponse:
    """
    Moderate a comment (admin only).

    Changes comment status to approved, rejected, or spam.
    Only admins with COMMENT_MODERATE permission can use this endpoint.
    """
    # Fetch the comment with user info
    query = (
        sa.select(
            comments.c.id,
            comments.c.video_id,
            comments.c.user_id,
            comments.c.content,
            comments.c.status,
            comments.c.video_timestamp,
            comments.c.depth,
            comments.c.path,
            comments.c.parent_id,
            comments.c.created_at,
            comments.c.updated_at,
            comments.c.deleted_at,
            users.c.username,
            users.c.display_name,
        )
        .select_from(comments.join(users, comments.c.user_id == users.c.id))
        .where(comments.c.id == comment_id)
    )
    comment = await database.
force_delete_comment function · python · L11251-L11316 (66 LOC)
api/admin.py
async def force_delete_comment(
    request: Request,
    comment_id: int,
) -> dict:
    """
    Permanently delete a comment (admin only).

    Unlike the public delete endpoint which soft-deletes, this permanently
    removes the comment and all its replies from the database.
    """
    # Fetch the comment
    comment = await database.fetch_one(
        comments.select().where(comments.c.id == comment_id)
    )

    if not comment:
        raise HTTPException(status_code=404, detail="Comment not found")

    # Delete all child comments first (by path prefix)
    # Then delete the comment itself
    # The ON DELETE CASCADE should handle this, but let's be explicit for audit purposes

    # Get count of replies that will be deleted
    if comment["path"]:
        reply_count_query = (
            sa.select(sa.func.count())
            .select_from(comments)
            .where(comments.c.path.op("<@")(comment["path"]))
            .where(comments.c.id != comment_id)
        )
        
_get_last_restore_time function · python · L11327-L11342 (16 LOC)
api/admin.py
async def _get_last_restore_time() -> Optional[datetime]:
    """
    Get the last restore time from database settings.

    Returns:
        Last restore datetime or None if never restored
    """
    result = await fetch_one_with_retry(
        settings.select().where(settings.c.key == RESTORE_RATE_LIMIT_KEY)
    )
    if result and result["value"]:
        try:
            return datetime.fromisoformat(result["value"].strip('"'))
        except (ValueError, TypeError):
            return None
    return None
_set_last_restore_time function · python · L11345-L11377 (33 LOC)
api/admin.py
async def _set_last_restore_time(restore_time: datetime) -> None:
    """
    Set the last restore time in database settings.

    Args:
        restore_time: The time of the restore operation
    """
    value = restore_time.isoformat()

    # Check if setting exists
    existing = await fetch_one_with_retry(
        settings.select().where(settings.c.key == RESTORE_RATE_LIMIT_KEY)
    )

    if existing:
        # Update existing setting
        await execute_with_retry(
            settings.update()
            .where(settings.c.key == RESTORE_RATE_LIMIT_KEY)
            .values(value=value, updated_at=datetime.now(timezone.utc))
        )
    else:
        # Insert new setting
        await execute_with_retry(
            settings.insert().values(
                key=RESTORE_RATE_LIMIT_KEY,
                value=value,
                category="backup",
                description="Last backup restore time (for rate limiting)",
                value_type="string",
                u
create_backup function · python · L11382-L11474 (93 LOC)
api/admin.py
async def create_backup(request: Request, data: BackupCreateRequest) -> BackupResponse:
    """
    Create a new backup.

    Creates a backup of the database and optionally video files.
    The backup runs in the background and status can be monitored via GET /backups/{backup_id}.
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    from backup.service import BackupService
    from backup.manifest import BackupType

    # Map request type to BackupType
    backup_type_map = {
        "full": BackupType.FULL,
        "database_only": BackupType.DATABASE_ONLY,
        "incremental": BackupType.INCREMENTAL,
    }
    backup_type = backup_type_map.get(data.backup_type.value, BackupType.DATABASE_ONLY)

    # Get username from session if available
    created_by = "api"
    session_token = request.cookies.get("vlog_admin_session")
    if session_token:
        session = await fetch_one_with_retry(
            admin_s
Same scanner, your repo: https://repobility.com — Repobility
list_backups function · python · L11479-L11538 (60 LOC)
api/admin.py
async def list_backups(
    request: Request,
    status: Optional[str] = Query(None, description="Filter by status"),
    backup_type: Optional[str] = Query(None, description="Filter by backup type"),
    limit: int = Query(50, ge=1, le=100, description="Maximum number of backups to return"),
    offset: int = Query(0, ge=0, description="Offset for pagination"),
) -> BackupListResponse:
    """
    List all backups.

    Returns a list of backups ordered by creation date (newest first).
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    query = backups.select().order_by(backups.c.created_at.desc())

    # Apply filters
    if status:
        query = query.where(backups.c.status == status)
    if backup_type:
        query = query.where(backups.c.backup_type == backup_type)

    # Get total count
    count_query = sa.select(sa.func.count()).select_from(backups)
    if status:
        count_query = count_query.
get_backup function · python · L11543-L11581 (39 LOC)
api/admin.py
async def get_backup(
    request: Request,
    backup_id: str = FastAPIPath(..., description="Backup ID"),
) -> BackupResponse:
    """
    Get backup details.

    Returns detailed information about a specific backup.
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    backup = await fetch_one_with_retry(
        backups.select().where(backups.c.backup_id == backup_id)
    )

    if not backup:
        raise HTTPException(status_code=404, detail="Backup not found")

    return BackupResponse(
        backup_id=backup["backup_id"],
        backup_type=backup["backup_type"],
        status=backup["status"],
        size_bytes=backup["size_bytes"],
        database_size_bytes=backup["database_size_bytes"],
        files_size_bytes=backup["files_size_bytes"],
        video_count=backup["video_count"],
        file_count=backup["file_count"],
        description=backup["description"],
        local_path=backup["lo
restore_backup function · python · L11586-L11676 (91 LOC)
api/admin.py
async def restore_backup(
    request: Request,
    backup_id: str = FastAPIPath(..., description="Backup ID to restore"),
    data: BackupRestoreRequest = None,
) -> BackupRestoreResponse:
    """
    Restore from a backup.

    WARNING: This operation will replace the current database and optionally files.
    A safety backup is created before restoration. Rate limited to 1 per hour.

    Use dry_run=true to verify the backup without performing the actual restore.
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    if data is None:
        data = BackupRestoreRequest()

    # Additional rate limiting check (cooldown) - uses database for multi-process safety
    if not data.dry_run:
        last_restore_time = await _get_last_restore_time()
        if last_restore_time:
            elapsed = (datetime.now(timezone.utc) - last_restore_time).total_seconds()
            if elapsed < BACKUP_RESTORE_COOLDOWN_SECOND
verify_backup function · python · L11681-L11738 (58 LOC)
api/admin.py
async def verify_backup(
    request: Request,
    backup_id: str = FastAPIPath(..., description="Backup ID to verify"),
) -> BackupVerifyResponse:
    """
    Verify backup integrity.

    Checks archive integrity, manifest signature, and file checksums.
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    # Verify backup exists
    backup = await fetch_one_with_retry(
        backups.select().where(backups.c.backup_id == backup_id)
    )

    if not backup:
        raise HTTPException(status_code=404, detail="Backup not found")

    from backup.verify import BackupVerifier

    try:
        verifier = BackupVerifier(backup_path=BACKUP_PATH)
        result = await verifier.verify_backup(
            backup_id=backup_id,
            signing_key=BACKUP_SIGNING_KEY or None,
            verify_files=True,
        )

        # Audit log
        log_audit(
            AuditAction.BACKUP_VERIFY,
            client_ip=
delete_backup function · python · L11743-L11792 (50 LOC)
api/admin.py
async def delete_backup(
    request: Request,
    backup_id: str = FastAPIPath(..., description="Backup ID to delete"),
    delete_from_s3: bool = Query(False, description="Also delete from S3"),
) -> dict:
    """
    Delete a backup.

    Removes the backup from the local filesystem and optionally from S3.
    """
    if not BACKUP_ENABLED:
        raise HTTPException(status_code=403, detail="Backup functionality is disabled")

    # Verify backup exists
    backup = await fetch_one_with_retry(
        backups.select().where(backups.c.backup_id == backup_id)
    )

    if not backup:
        raise HTTPException(status_code=404, detail="Backup not found")

    from backup.service import BackupService

    try:
        service = BackupService(
            database_url=DATABASE_URL,
            videos_dir=VIDEOS_DIR,
            backup_path=BACKUP_PATH,
        )

        await service.delete_backup(backup_id, delete_remote=delete_from_s3)

        # Audit log
        log_audit(
      
AnalyticsCache.__init__ method · python · L60-L72 (13 LOC)
api/analytics_cache.py
    def __init__(self, ttl_seconds: int = DEFAULT_CACHE_TTL_SECONDS, enabled: bool = True, max_size: int = DEFAULT_CACHE_MAX_SIZE):
        """
        Initialize the cache.

        Args:
            ttl_seconds: Time to live in seconds for cache entries
            enabled: Whether caching is enabled
            max_size: Maximum number of entries before triggering eviction
        """
        self._cache: Dict[str, Dict[str, Any]] = {}
        self._ttl = ttl_seconds
        self._enabled = enabled
        self._max_size = max_size
AnalyticsCache.get method · python · L74-L97 (24 LOC)
api/analytics_cache.py
    def get(self, key: str) -> Optional[Any]:
        """
        Get a value from the cache.

        Args:
            key: Cache key

        Returns:
            Cached value if exists and not expired, None otherwise
        """
        if not self._enabled:
            return None

        cached = self._cache.get(key)
        if cached is None:
            return None

        # Check if expired
        if time.time() - cached["timestamp"] > self._ttl:
            # Remove expired entry
            del self._cache[key]
            return None

        return cached["data"]
AnalyticsCache.set method · python · L99-L131 (33 LOC)
api/analytics_cache.py
    def set(self, key: str, value: Any) -> None:
        """
        Set a value in the cache.

        Args:
            key: Cache key
            value: Value to cache
        """
        if not self._enabled:
            return

        # Probabilistic cleanup to amortize cleanup cost and prevent unbounded growth
        if random.random() < self.CLEANUP_PROBABILITY:
            self.cleanup_expired()

        # Evict oldest entries if at capacity (only when adding new keys)
        if key not in self._cache and len(self._cache) >= self._max_size:
            self.cleanup_expired()

            # If still at capacity after cleanup, remove oldest 10% via LRU
            # Note: This uses O(n log n) sorting but only occurs when cache has
            # max_size non-expired entries, which is rare due to probabilistic cleanup
            # and TTL expiration. For default max_size of 1000, performance is acceptable.
            if len(self._cache) >= self._max_size:
                items
Repobility · severity-and-effort ranking · https://repobility.com
AnalyticsCache.invalidate method · python · L137-L145 (9 LOC)
api/analytics_cache.py
    def invalidate(self, key: str) -> None:
        """
        Invalidate a specific cache entry.

        Args:
            key: Cache key to invalidate
        """
        if key in self._cache:
            del self._cache[key]
AnalyticsCache.cleanup_expired method · python · L147-L163 (17 LOC)
api/analytics_cache.py
    def cleanup_expired(self) -> int:
        """
        Remove all expired entries from cache.

        Returns:
            Number of entries removed
        """
        if not self._enabled:
            return 0

        now = time.time()
        expired_keys = [key for key, cached in self._cache.items() if now - cached["timestamp"] > self._ttl]

        for key in expired_keys:
            del self._cache[key]

        return len(expired_keys)
AnalyticsCache.get_stats method · python · L165-L178 (14 LOC)
api/analytics_cache.py
    def get_stats(self) -> Dict[str, Any]:
        """
        Get cache statistics.

        Returns:
            Dict with cache stats (size, TTL, enabled status, max size)
        """
        return {
            "enabled": self._enabled,
            "ttl_seconds": self._ttl,
            "entry_count": len(self._cache),
            "max_size": self._max_size,
            "backend": "memory",
        }
RedisAnalyticsCache.__init__ method · python · L193-L216 (24 LOC)
api/analytics_cache.py
    def __init__(
        self,
        redis_url: str,
        ttl_seconds: int = DEFAULT_CACHE_TTL_SECONDS,
        enabled: bool = True,
    ):
        """
        Initialize the Redis cache.

        Args:
            redis_url: Redis connection URL (e.g., "redis://localhost:6379")
            ttl_seconds: Time to live in seconds for cache entries
            enabled: Whether caching is enabled
        """
        self._redis_url = redis_url
        self._ttl = ttl_seconds
        self._enabled = enabled
        self._client: Optional[Any] = None
        self._connection_failed = False
        self._last_reconnect_attempt: Optional[float] = None
        self._reconnect_backoff = REDIS_RECONNECT_MIN_INTERVAL

        if enabled:
            self._initialize_client()
RedisAnalyticsCache._initialize_client method · python · L218-L250 (33 LOC)
api/analytics_cache.py
    def _initialize_client(self) -> bool:
        """
        Initialize the Redis client.

        Returns:
            True if connection successful, False otherwise.
        """
        try:
            import redis as redis_module  # Lazy import
            self._client = redis_module.Redis.from_url(
                self._redis_url,
                socket_timeout=REDIS_SOCKET_TIMEOUT,
                socket_connect_timeout=REDIS_CONNECT_TIMEOUT,
                decode_responses=True,
            )
            # Test connection (respects socket_timeout)
            self._client.ping()
            logger.info(f"Redis analytics cache connected: {self._redis_url.split('@')[-1]}")
            self._connection_failed = False
            self._reconnect_backoff = REDIS_RECONNECT_MIN_INTERVAL
            return True
        except ImportError as e:
            # Redis package not installed - graceful degradation
            logger.warning(f"Redis package not available: {e}")
            se
RedisAnalyticsCache._maybe_reconnect method · python · L252-L282 (31 LOC)
api/analytics_cache.py
    def _maybe_reconnect(self) -> bool:
        """
        Attempt reconnection if enough time has passed since last attempt.

        Uses exponential backoff to avoid hammering a failing Redis server.

        Returns:
            True if connected (either already or after reconnect), False otherwise.
        """
        if not self._connection_failed:
            return self._client is not None

        now = time.time()
        if self._last_reconnect_attempt is not None:
            elapsed = now - self._last_reconnect_attempt
            if elapsed < self._reconnect_backoff:
                return False

        self._last_reconnect_attempt = now
        logger.info(f"Attempting Redis reconnection (backoff: {self._reconnect_backoff}s)")

        if self._initialize_client():
            logger.info("Redis analytics cache reconnected successfully")
            return True

        # Increase backoff for next attempt (exponential with cap)
        self._reconnect_backoff = min(
  
RedisAnalyticsCache._safe_redis_call method · python · L288-L328 (41 LOC)
api/analytics_cache.py
    def _safe_redis_call(
        self,
        operation: Callable[[], T],
        operation_name: str,
        fallback: Optional[T] = None
    ) -> Optional[T]:
        """
        Execute a Redis operation with error handling and automatic reconnection.

        Args:
            operation: The Redis operation to execute
            operation_name: Name for logging purposes
            fallback: Value to return on failure

        Returns:
            Operation result or fallback value on failure.
        """
        if not self._enabled:
            return fallback

        # Attempt reconnection if previously failed
        if self._client is None or self._connection_failed:
            if not self._maybe_reconnect():
                return fallback

        try:
            return operation()
        except RedisError as e:
            # Redis-specific errors (connection issues, protocol errors, etc.)
            logger.warning(f"Redis analytics cache {operation_name} failed: {e
RedisAnalyticsCache.get method · python · L330-L360 (31 LOC)
api/analytics_cache.py
    def get(self, key: str) -> Optional[Any]:
        """
        Get a value from the cache.

        Args:
            key: Cache key

        Returns:
            Cached value if exists and not expired, None otherwise
        """
        def get_operation():
            full_key = self._get_full_key(key)
            data = self._client.get(full_key)
            if data is None:
                return None
            try:
                return json.loads(data)
            except (json.JSONDecodeError, TypeError, ValueError) as e:
                # Corrupted cache entry - delete it to prevent repeated failures
                logger.warning(f"Corrupted cache entry for key '{key}': {e}")
                try:
                    self._client.delete(full_key)
                except (RedisError, OSError, TimeoutError):
                    pass  # Best effort cleanup
                return None

        return self._safe_redis_call(
            get_operation,
            "get",
         
Repobility (the analyzer behind this table) · https://repobility.com
RedisAnalyticsCache.set method · python · L362-L377 (16 LOC)
api/analytics_cache.py
    def set(self, key: str, value: Any) -> None:
        """
        Set a value in the cache.

        Args:
            key: Cache key
            value: Value to cache
        """
        self._safe_redis_call(
            lambda: self._client.setex(
                self._get_full_key(key),
                self._ttl,
                json.dumps(value),
            ),
            "set",
        )
RedisAnalyticsCache.clear method · python · L379-L406 (28 LOC)
api/analytics_cache.py
    def clear(self) -> None:
        """
        Clear all analytics cache entries.

        Uses a timeout to prevent blocking indefinitely on large keyspaces.
        If the operation times out, some keys may remain.
        """
        def operation():
            start_time = time.time()
            cursor = 0
            pattern = f"{self.CACHE_KEY_PREFIX}*"
            keys_deleted = 0
            while True:
                # Check timeout to prevent blocking on large keyspaces
                if time.time() - start_time > REDIS_BULK_OPERATION_TIMEOUT:
                    logger.warning(
                        f"Redis clear timed out after {REDIS_BULK_OPERATION_TIMEOUT}s, "
                        f"deleted {keys_deleted} keys, some may remain"
                    )
                    return
                cursor, keys = self._client.scan(cursor, match=pattern, count=REDIS_SCAN_BATCH_SIZE)
                if keys:
                    self._client.delete(*keys)
               
RedisAnalyticsCache.invalidate method · python · L408-L418 (11 LOC)
api/analytics_cache.py
    def invalidate(self, key: str) -> None:
        """
        Invalidate a specific cache entry.

        Args:
            key: Cache key to invalidate
        """
        self._safe_redis_call(
            lambda: self._client.delete(self._get_full_key(key)),
            "invalidate",
        )
‹ prevpage 4 / 20next ›