← back to filthyrake__vlog

Function bodies 1,000 total

All specs Real LLM only Function bodies
bulk_restore_videos function · python · L3204-L3304 (101 LOC)
api/admin.py
async def bulk_restore_videos(request: Request, data: BulkRestoreRequest) -> BulkRestoreResponse:
    """
    Restore multiple soft-deleted videos from archive.
    """
    bulk_operation_id = str(uuid.uuid4())
    results = []
    restored_count = 0
    failed_count = 0
    client_ip = get_real_ip(request)
    user_agent = request.headers.get("user-agent")

    for video_id in data.video_ids:
        try:
            row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
            if not row:
                results.append(BulkOperationResult(video_id=video_id, success=False, error="Video not found"))
                failed_count += 1
                continue

            if not row["deleted_at"]:
                results.append(BulkOperationResult(video_id=video_id, success=False, error="Video is not deleted"))
                failed_count += 1
                continue

            original_deleted_at = row["deleted_at"]

            # Update database first
 
restore_video function · python · L3309-L3389 (81 LOC)
api/admin.py
async def restore_video(request: Request, video_id: int):
    """Restore a soft-deleted video from archive."""
    row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not row:
        raise HTTPException(status_code=404, detail="Video not found")

    if not row["deleted_at"]:
        raise HTTPException(status_code=400, detail="Video is not deleted")

    # Store original deleted_at for potential rollback
    original_deleted_at = row["deleted_at"]

    # Update database FIRST to avoid inconsistent state if file ops fail
    await database.execute(videos.update().where(videos.c.id == video_id).values(deleted_at=None))

    archive_video_dir = ARCHIVE_DIR / row["slug"]
    video_dir = VIDEOS_DIR / row["slug"]
    moved_files = []  # Track what we moved for rollback

    try:
        # Move video files back from archive
        if archive_video_dir.exists():
            shutil.move(str(archive_video_dir), str(video_dir))
            moved_files.append((
retry_video function · python · L3394-L3434 (41 LOC)
api/admin.py
async def retry_video(request: Request, video_id: int):
    """Retry processing a failed video."""
    row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not row:
        raise HTTPException(status_code=404, detail="Video not found")

    if row["status"] != VideoStatus.FAILED:
        raise HTTPException(status_code=400, detail="Video is not in failed state")

    # Check if source file exists
    source_exists = False
    for ext in SUPPORTED_VIDEO_EXTENSIONS:
        if (UPLOADS_DIR / f"{video_id}{ext}").exists():
            source_exists = True
            break

    if not source_exists:
        raise HTTPException(status_code=400, detail="Source file no longer exists")

    # Reset status to pending
    await database.execute(
        videos.update()
        .where(videos.c.id == video_id)
        .values(
            status=VideoStatus.PENDING,
            error_message=None,
        )
    )

    # Audit log
    log_audit(
        AuditAction.
re_upload_video function · python · L3439-L3567 (129 LOC)
api/admin.py
async def re_upload_video(
    request: Request,
    video_id: int,
    file: UploadFile = File(...),
):
    """
    Re-upload a video file, replacing the existing transcoded content.

    This will:
    - Delete all existing transcoded files (HLS segments, playlists, thumbnail)
    - Delete video_qualities, transcoding_jobs, quality_progress, transcriptions
    - Save the new file and queue for reprocessing
    - Preserve: title, description, category, published_at, created_at, slug
    """
    # Early rejection based on Content-Length header (if provided)
    validate_content_length(request)

    # Validate file extension
    file_ext = Path(file.filename).suffix.lower() if file.filename else ""
    if not file_ext:
        file_ext = ".mp4"
    if file_ext not in ALLOWED_VIDEO_EXTENSIONS:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid file type '{file_ext}'. Allowed: {', '.join(sorted(ALLOWED_VIDEO_EXTENSIONS))}",
        )

    # Get video in
get_video_progress function · python · L3572-L3626 (55 LOC)
api/admin.py
async def get_video_progress(request: Request, video_id: int) -> TranscodingProgressResponse:
    """Get transcoding progress for a video."""
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))

    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # If video is ready or failed, return simple status
    if video["status"] in [VideoStatus.READY, VideoStatus.FAILED]:
        return TranscodingProgressResponse(
            status=video["status"],
            progress_percent=100 if video["status"] == VideoStatus.READY else 0,
            last_error=sanitize_progress_error(video["error_message"])
            if video["status"] == VideoStatus.FAILED
            else None,
        )

    # If pending, return basic pending status
    if video["status"] == VideoStatus.PENDING:
        return TranscodingProgressResponse(
            status=VideoStatus.PENDING,
            progress_percent=0,
        )

    # Get job info f
get_video_qualities function · python · L3631-L3665 (35 LOC)
api/admin.py
async def get_video_qualities(request: Request, video_id: int) -> VideoQualitiesResponse:
    """Get available and existing qualities for a video."""
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))

    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get existing transcoded qualities
    quality_rows = await database.fetch_all(video_qualities.select().where(video_qualities.c.video_id == video_id))

    existing = [
        VideoQualityInfo(
            name=q["quality"],
            width=q["width"],
            height=q["height"],
            bitrate=q["bitrate"],
            status="completed",
        )
        for q in quality_rows
    ]

    # Determine available qualities based on source resolution
    source_height = video["source_height"] or 0
    available = ["original"]  # Always available
    for preset in QUALITY_PRESETS:
        if preset["height"] <= source_height:
            available.appen
retranscode_video function · python · L3670-L3762 (93 LOC)
api/admin.py
async def retranscode_video(
    request: Request,
    video_id: int,
    data: RetranscodeRequest,
) -> RetranscodeResponse:
    """
    Re-transcode a video, either all qualities or specific ones.

    This will queue the video for re-transcoding while keeping it playable
    until a worker actually claims and starts processing the job (Issue #408).

    The actual cleanup (file deletion, video_qualities deletion, status change)
    is deferred to when a worker claims the job via claim_job().
    """
    # Get video info
    row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not row:
        raise HTTPException(status_code=404, detail="Video not found")

    if row["deleted_at"]:
        raise HTTPException(status_code=400, detail="Cannot re-transcode a deleted video")

    slug = row["slug"]
    video_dir = VIDEOS_DIR / slug

    # Check if source file exists
    source_file = None
    for ext in SUPPORTED_VIDEO_EXTENSIONS:
        candidate = UPLO
Repobility · code-quality intelligence platform · https://repobility.com
get_video_transcript function · python · L3770-L3797 (28 LOC)
api/admin.py
async def get_video_transcript(request: Request, video_id: int) -> TranscriptionResponse:
    """Get transcription status and text for a video."""
    # Get video
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get transcription record
    transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))

    if not transcription:
        return TranscriptionResponse(status=TranscriptionStatus.NONE)

    vtt_url = None
    if transcription["status"] == TranscriptionStatus.COMPLETED and transcription["vtt_path"]:
        vtt_url = f"/videos/{video['slug']}/captions.vtt"

    return TranscriptionResponse(
        status=transcription["status"],
        language=transcription["language"],
        text=transcription["transcript_text"],
        vtt_url=vtt_url,
        word_count=transcription["word_count"],
       
trigger_transcription function · python · L3802-L3869 (68 LOC)
api/admin.py
async def trigger_transcription(request: Request, video_id: int, data: TranscriptionTrigger = None):
    """Manually trigger transcription for a video."""
    # Get video
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    if video["status"] != VideoStatus.READY:
        raise HTTPException(status_code=400, detail="Video must be ready before transcription")

    # Check if transcription already exists
    existing = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))

    if existing:
        if existing["status"] == TranscriptionStatus.PROCESSING:
            raise HTTPException(status_code=400, detail="Transcription already in progress")

        # Reset to pending for re-transcription
        await database.execute(
            transcriptions.update()
            .where(transcriptions.c.video_id == video_id)
       
update_transcript function · python · L3874-L3909 (36 LOC)
api/admin.py
async def update_transcript(request: Request, video_id: int, data: TranscriptionUpdate):
    """Manually edit/correct transcript text and regenerate VTT."""
    # Get video
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get transcription
    transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))

    if not transcription:
        raise HTTPException(status_code=404, detail="No transcription found for this video")

    # Update transcript text
    word_count = len(data.text.split())
    await database.execute(
        transcriptions.update()
        .where(transcriptions.c.video_id == video_id)
        .values(
            transcript_text=data.text,
            word_count=word_count,
        )
    )

    # Audit log
    log_audit(
        AuditAction.TRANSCRIPTION_UPDATE,
        client_ip=get_real_i
delete_transcript function · python · L3914-L3945 (32 LOC)
api/admin.py
async def delete_transcript(request: Request, video_id: int):
    """Delete transcription and VTT file for a video."""
    # Get video
    video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get transcription
    transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))

    if not transcription:
        raise HTTPException(status_code=404, detail="No transcription found for this video")

    # Delete VTT file if exists
    vtt_path = VIDEOS_DIR / video["slug"] / "captions.vtt"
    if vtt_path.exists():
        vtt_path.unlink()

    # Delete transcription record
    await database.execute(transcriptions.delete().where(transcriptions.c.video_id == video_id))

    # Audit log
    log_audit(
        AuditAction.TRANSCRIPTION_DELETE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("u
analytics_overview function · python · L3953-L4059 (107 LOC)
api/admin.py
async def analytics_overview(request: Request, response: Response) -> AnalyticsOverview:
    """Get global analytics overview."""
    # Try to get from cache first
    cache_key = "analytics_overview"
    cached_data = analytics_cache.get(cache_key)

    # Get cache max-age from settings (with env var fallback)
    cache_max_age = await get_analytics_client_cache_max_age()

    if cached_data is not None:
        # Set Cache-Control header for client-side caching
        response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
        return AnalyticsOverview(**cached_data)

    # Cache miss - compute fresh data
    now = datetime.now(timezone.utc)
    today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
    week_start = today_start - timedelta(days=7)
    month_start = today_start - timedelta(days=30)

    # Total views
    total_views = await fetch_val_with_retry(sa.select(sa.func.count()).select_from(playback_sessions)) or 0

    # Unique viewers
    u
analytics_videos function · python · L4064-L4171 (108 LOC)
api/admin.py
async def analytics_videos(
    request: Request,
    response: Response,
    limit: int = Query(default=50, ge=1, le=100, description="Max items per page"),
    offset: int = Query(default=0, ge=0, description="Number of items to skip"),
    sort_by: str = "views",
    period: str = "all",
) -> VideoAnalyticsListResponse:
    """Get per-video analytics."""
    # Get cache max-age from settings (with env var fallback)
    cache_max_age = await get_analytics_client_cache_max_age()

    # Try to get from cache first
    cache_key = f"analytics_videos:{limit}:{offset}:{sort_by}:{period}"
    cached_data = analytics_cache.get(cache_key)

    if cached_data is not None:
        # Set Cache-Control header for client-side caching
        response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
        # Reconstruct response models from cached data
        cached_videos = [VideoAnalyticsSummary(**v) for v in cached_data["videos"]]
        return VideoAnalyticsListResponse(videos
analytics_video_detail function · python · L4176-L4283 (108 LOC)
api/admin.py
async def analytics_video_detail(request: Request, response: Response, video_id: int) -> VideoAnalyticsDetail:
    """Get detailed analytics for a specific video."""
    # Get cache max age from settings service
    cache_max_age = await get_analytics_client_cache_max_age()

    # Try to get from cache first
    cache_key = f"analytics_video_detail:{video_id}"
    cached_data = analytics_cache.get(cache_key)

    if cached_data is not None:
        # Set Cache-Control header for client-side caching
        response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
        # Reconstruct response models from cached data
        quality_breakdown = [QualityBreakdown(**q) for q in cached_data["quality_breakdown"]]
        views_over_time = [DailyViews(**v) for v in cached_data["views_over_time"]]
        return VideoAnalyticsDetail(
            video_id=cached_data["video_id"],
            title=cached_data["title"],
            duration=cached_data["duration"],
            to
analytics_trends function · python · L4288-L4357 (70 LOC)
api/admin.py
async def analytics_trends(
    request: Request,
    response: Response,
    period: str = "30d",
    video_id: Optional[int] = None,
) -> TrendsResponse:
    """Get time-series analytics data."""
    # Get cache max age from settings service
    cache_max_age = await get_analytics_client_cache_max_age()

    # Try to get from cache first
    cache_key = f"analytics_trends:{period}:{video_id or 'all'}"
    cached_data = analytics_cache.get(cache_key)

    if cached_data is not None:
        # Set Cache-Control header for client-side caching
        response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
        # Reconstruct response models from cached data
        data = [TrendDataPoint(**d) for d in cached_data["data"]]
        return TrendsResponse(period=cached_data["period"], data=data)

    # Cache miss - compute fresh data
    # Validate period to prevent SQL injection (whitelist approach)
    valid_periods = {"7d": 7, "30d": 30, "90d": 90}
    days = valid_peri
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
export_videos function · python · L4362-L4465 (104 LOC)
api/admin.py
async def export_videos(
    request: Request,
    status: Optional[str] = Query(None, description="Filter by status (pending, processing, ready, failed)"),
    category_id: Optional[int] = Query(None, description="Filter by category ID"),
    include_deleted: bool = Query(False, description="Include soft-deleted videos"),
    limit: int = Query(10000, ge=1, le=10000, description="Maximum number of videos to export"),
    offset: int = Query(0, ge=0, description="Offset for pagination"),
) -> VideoExportResponse:
    """
    Export video metadata as JSON.

    Supports filtering by status, category, and deleted state.
    For CSV export, use the JSON response and convert client-side.
    """
    # Validate status if provided
    valid_statuses = [s.value for s in VideoStatus]
    if status and status not in valid_statuses:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid status '{status}'. Valid options: {', '.join(valid_statuses)}",
        )

   
parse_worker_capabilities function · python · L4471-L4508 (38 LOC)
api/admin.py
def parse_worker_capabilities(metadata_json: Optional[str]) -> dict:
    """
    Parse worker metadata JSON and return a dict with hardware info and version tracking.

    The metadata can contain:
    - capabilities: GPU/hardware info including code_version
    - deployment_type: How the worker is deployed (kubernetes, systemd, docker, manual)
    """
    result = {
        "hwaccel_enabled": False,
        "hwaccel_type": None,
        "gpu_name": None,
        "code_version": None,
        "deployment_type": None,
    }

    if not metadata_json:
        return result

    try:
        data = json.loads(metadata_json)

        # Extract deployment_type from top level
        result["deployment_type"] = data.get("deployment_type")

        # Handle nested structure {"capabilities": {...}} or flat structure
        caps = data
        if "capabilities" in data and isinstance(data["capabilities"], dict):
            caps = data["capabilities"]

        result["hwaccel_enabled"] = caps.
determine_worker_status function · python · L4511-L4532 (22 LOC)
api/admin.py
def determine_worker_status(
    db_status: str, last_heartbeat: Optional[datetime], current_job_id: Optional[int], offline_threshold: datetime
) -> str:
    """Determine the effective worker status based on heartbeat and current job."""
    if db_status == "disabled":
        return "disabled"

    if not last_heartbeat:
        return "offline"

    # Ensure timezone-aware comparison
    hb = last_heartbeat
    if hb.tzinfo is None:
        hb = hb.replace(tzinfo=timezone.utc)

    if hb < offline_threshold:
        return "offline"

    # Active = has a job, Idle = no job but online
    if current_job_id:
        return "active"
    return "idle"
list_workers_dashboard function · python · L4537-L4662 (126 LOC)
api/admin.py
async def list_workers_dashboard(request: Request) -> WorkerDashboardResponse:
    """
    List all workers with their status and current activity.

    Shows real-time worker status with heartbeat information,
    current job assignments, and hardware capabilities.
    """
    now = datetime.now(timezone.utc)
    offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)

    # Get all workers
    worker_rows = await fetch_all_with_retry(workers.select().order_by(workers.c.last_heartbeat.desc()))

    # Batch fetch current job info for all workers with active jobs
    current_job_ids = [row["current_job_id"] for row in worker_rows if row["current_job_id"]]
    current_jobs_info = {}
    if current_job_ids:
        job_rows = await database.fetch_all(
            sa.select(
                transcoding_jobs.c.id,
                transcoding_jobs.c.current_step,
                transcoding_jobs.c.progress_percent,
                videos.c.slug,
                videos.c
list_active_jobs function · python · L4667-L4760 (94 LOC)
api/admin.py
async def list_active_jobs(request: Request) -> ActiveJobsResponse:
    """
    List all active transcoding jobs with worker information.

    Shows jobs that are pending or being processed, including
    which worker is handling each job and progress details.
    """
    # Get all jobs for videos that are pending or processing
    query = sa.text("""
        SELECT
            tj.id as job_id,
            tj.video_id,
            v.slug as video_slug,
            v.title as video_title,
            v.status as video_status,
            tj.worker_id,
            tj.current_step,
            tj.progress_percent,
            tj.started_at,
            tj.claimed_at,
            tj.attempt_number,
            tj.max_attempts,
            w.worker_name,
            w.metadata as capabilities
        FROM transcoding_jobs tj
        JOIN videos v ON tj.video_id = v.id
        LEFT JOIN workers w ON tj.worker_id = w.worker_id
        WHERE v.status IN ('pending', 'processing')
          AND 
get_worker_detail function · python · L4765-L4896 (132 LOC)
api/admin.py
async def get_worker_detail(request: Request, worker_id: str) -> WorkerDetailResponse:
    """
    Get detailed information about a specific worker.

    Includes capabilities, metadata, stats, and recent job history.
    """
    # Find worker by UUID
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    now = datetime.now(timezone.utc)
    offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)
    status = determine_worker_status(
        worker["status"], worker["last_heartbeat"], worker["current_job_id"], offline_threshold
    )

    # Parse capabilities and metadata
    capabilities = None
    if worker["capabilities"]:
        try:
            capabilities = json.loads(worker["capabilities"])
        except (json.JSONDecodeError, TypeError):
            # If capabilities is not valid JSON or malformed, leave as None
    
disable_worker function · python · L4901-L4952 (52 LOC)
api/admin.py
async def disable_worker(request: Request, worker_id: str):
    """
    Disable a worker, preventing it from claiming new jobs.

    The worker's existing claimed job (if any) is released back
    to the pending queue.
    """
    # Find worker by UUID
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    if worker["status"] == "disabled":
        raise HTTPException(status_code=400, detail="Worker is already disabled")

    async with database.transaction():
        # Mark worker as disabled
        await database.execute(
            workers.update().where(workers.c.id == worker["id"]).values(status="disabled", current_job_id=None)
        )

        # Release any claimed job
        if worker["current_job_id"]:
            job = await database.fetch_one(
                transcoding_jobs.select().where(transcoding_jobs.c.id == worker["current_job
enable_worker function · python · L4957-L4982 (26 LOC)
api/admin.py
async def enable_worker(request: Request, worker_id: str):
    """
    Re-enable a disabled worker, allowing it to claim jobs again.
    """
    # Find worker by UUID
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    if worker["status"] != "disabled":
        raise HTTPException(status_code=400, detail="Worker is not disabled")

    # Re-enable worker
    await db_execute_with_retry(workers.update().where(workers.c.id == worker["id"]).values(status="active"))

    # Audit log
    log_audit(
        AuditAction.WORKER_ENABLE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        resource_type="worker",
        resource_id=worker["id"],
        resource_name=worker["worker_name"] or worker["worker_id"][:8],
    )

    return {"status": "ok", "message": "Worker enabled"}
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
delete_worker function · python · L4987-L5058 (72 LOC)
api/admin.py
async def delete_worker(
    request: Request,
    worker_id: str,
    revoke_keys: bool = True,
):
    """
    Delete a worker and optionally revoke its API keys.

    Args:
        worker_id: The worker UUID to delete
        revoke_keys: If True (default), revoke all API keys. If False, keep keys active.

    This will:
    - Release any claimed job back to pending
    - Revoke all API keys (if revoke_keys=True)
    - Delete the worker record
    """
    # Find worker by UUID
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    now = datetime.now(timezone.utc)

    async with database.transaction():
        # Release any claimed job
        if worker["current_job_id"]:
            job = await database.fetch_one(
                transcoding_jobs.select().where(transcoding_jobs.c.id == worker["current_job_id"])
            )
            if job an
restart_worker function · python · L5066-L5116 (51 LOC)
api/admin.py
async def restart_worker(request: Request, worker_id: str):
    """
    Send restart command to a specific worker.

    The worker will finish its current job (if any) before restarting.
    Requires Redis to be configured for pub/sub.
    """
    from api.pubsub import publish_worker_command
    from api.redis_client import get_redis

    # Verify worker exists
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    # Check if Redis is available
    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")

    # Get current version from capabilities if available
    caps = parse_worker_capabilities(worker.get("capabilities"))
    current_version = caps.get("code_version") if caps else None

    # Publish restart command
    success = await publish_w
stop_worker function · python · L5121-L5171 (51 LOC)
api/admin.py
async def stop_worker(request: Request, worker_id: str):
    """
    Send stop command to a specific worker.

    The worker will finish its current job (if any) before stopping.
    The process manager (systemd, k8s) will NOT restart the worker.
    """
    from api.pubsub import publish_worker_command
    from api.redis_client import get_redis

    # Verify worker exists
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    # Check if Redis is available
    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")

    # Get current version from capabilities if available
    caps = parse_worker_capabilities(worker.get("capabilities"))
    current_version = caps.get("code_version") if caps else None

    # Publish stop command
    success = await p
restart_all_workers function · python · L5176-L5225 (50 LOC)
api/admin.py
async def restart_all_workers(request: Request):
    """
    Broadcast restart command to all active workers.

    Each worker will finish its current job before restarting.
    This is useful for deploying updates across all workers.
    """
    from api.pubsub import publish_worker_command
    from api.redis_client import get_redis

    # Check if Redis is available
    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")

    # Get all online workers for deployment logging
    online_workers = await fetch_all_with_retry(workers.select().where(workers.c.status.in_(["active", "idle"])))

    # Publish broadcast restart command
    success = await publish_worker_command("all", "restart")
    if not success:
        raise HTTPException(status_code=500, detail="Failed to broadcast restart command")

    # Log deployment events for each worker
    triggered_by = get_real_ip(request)
 
update_worker function · python · L5230-L5284 (55 LOC)
api/admin.py
async def update_worker(request: Request, worker_id: str):
    """
    Send update command to a specific worker.

    The worker will:
    1. Finish its current job (if any)
    2. Pull latest code via git
    3. Restart to apply updates

    This is only available for git-based deployments.
    """
    from api.pubsub import publish_worker_command
    from api.redis_client import get_redis

    # Verify worker exists
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    # Check if Redis is available
    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")

    # Get current version from capabilities if available
    caps = parse_worker_capabilities(worker.get("capabilities"))
    current_version = caps.get("code_version") if caps else None

  
list_deployment_events function · python · L5292-L5332 (41 LOC)
api/admin.py
async def list_deployment_events(
    request: Request,
    worker_id: Optional[str] = Query(None, description="Filter by worker UUID"),
    event_type: Optional[str] = Query(None, description="Filter by event type"),
    limit: int = Query(default=50, ge=1, le=200, description="Number of events to return"),
):
    """
    List recent deployment events.

    Deployment events track worker restarts, updates, and version changes.
    """
    from api.database import deployment_events

    query = deployment_events.select().order_by(deployment_events.c.created_at.desc()).limit(limit)

    if worker_id:
        query = query.where(deployment_events.c.worker_id == worker_id)
    if event_type:
        query = query.where(deployment_events.c.event_type == event_type)

    rows = await fetch_all_with_retry(query)

    events = []
    for row in rows:
        events.append(
            {
                "id": row["id"],
                "worker_id": row["worker_id"],
                "worker_nam
_log_deployment_event function · python · L5335-L5361 (27 LOC)
api/admin.py
async def _log_deployment_event(
    worker_id: str,
    worker_name: Optional[str],
    event_type: str,
    triggered_by: str = "admin",
    old_version: Optional[str] = None,
    new_version: Optional[str] = None,
    status: str = "pending",
    details: Optional[str] = None,
) -> int:
    """Log a deployment event to the database."""
    from api.database import deployment_events

    result = await database.execute(
        deployment_events.insert().values(
            worker_id=worker_id,
            worker_name=worker_name,
            event_type=event_type,
            old_version=old_version,
            new_version=new_version,
            status=status,
            triggered_by=triggered_by,
            details=details,
            created_at=datetime.now(timezone.utc),
        )
    )
    return result
get_worker_logs function · python · L5366-L5428 (63 LOC)
api/admin.py
async def get_worker_logs(
    request: Request,
    worker_id: str,
    lines: int = Query(default=100, ge=1, le=1000, description="Number of log lines to fetch"),
):
    """
    Fetch recent logs from a worker.

    The worker will respond with logs based on its deployment type:
    - systemd: journalctl output
    - kubernetes: Points to kubectl logs
    - docker: Points to docker logs
    - manual: Process info and log file locations

    Requires the worker to be online and have Redis pub/sub enabled.
    """
    from api.pubsub import request_worker_response
    from api.redis_client import get_redis

    # Verify worker exists
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    # Check if worker is online
    if worker["status"] in ("offline", "disabled"):
        raise HTTPException(
            status_code=400,
            detail=f"Worke
Source: Repobility analyzer · https://repobility.com
get_worker_metrics function · python · L5433-L5485 (53 LOC)
api/admin.py
async def get_worker_metrics(request: Request, worker_id: str):
    """
    Fetch current metrics from a worker.

    Returns CPU, memory, disk usage, and GPU metrics (if available).
    Requires the worker to be online and have Redis pub/sub enabled.
    """
    from api.pubsub import request_worker_response
    from api.redis_client import get_redis

    # Verify worker exists
    worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
    if not worker:
        raise HTTPException(status_code=404, detail="Worker not found")

    # Check if worker is online
    if worker["status"] in ("offline", "disabled"):
        raise HTTPException(
            status_code=400,
            detail=f"Worker is {worker['status']} - cannot fetch metrics from offline workers",
        )

    # Check if Redis is available
    redis = await get_redis()
    if not redis:
        raise HTTPException(status_code=503, detail="Redis not available - worker metrics require
sse_progress function · python · L5493-L5596 (104 LOC)
api/admin.py
async def sse_progress(
    request: Request,
    video_ids: Optional[str] = Query(None, description="Comma-separated video IDs to monitor"),
):
    """
    Server-Sent Events endpoint for real-time transcoding progress.

    Subscribes to progress updates for specified videos (or all if none specified).
    Falls back to database polling if Redis is unavailable.

    SSE Message Format:
        event: progress
        data: {"video_id": 123, "progress_percent": 45, ...}

        event: heartbeat
        data: {"timestamp": "..."}
    """

    async def event_generator():
        # Send retry interval for client reconnection
        yield {"event": "retry", "data": str(SSE_RECONNECT_TIMEOUT_MS)}

        # Parse and validate video IDs
        vid_list = []
        if video_ids:
            try:
                vid_list = [int(v.strip()) for v in video_ids.split(",") if v.strip()]
            except ValueError:
                logger.warning(f"Invalid video IDs in SSE request: {video_id
sse_workers function · python · L5601-L5680 (80 LOC)
api/admin.py
async def sse_workers(request: Request):
    """
    Server-Sent Events endpoint for real-time worker status updates.

    Provides:
    - Worker status changes (active/idle/offline)
    - Current job assignments
    - Job completion/failure notifications
    """

    async def event_generator():
        # Send retry interval for client reconnection
        yield {"event": "retry", "data": str(SSE_RECONNECT_TIMEOUT_MS)}

        # Send initial state
        try:
            initial_state = await _get_workers_state()
            yield {"event": "initial", "data": json.dumps(initial_state)}
        except Exception as e:
            logger.warning(f"Failed to get initial workers state: {e}")

        # Check if Redis is available for pub/sub
        redis_available = await is_redis_available()

        if redis_available:
            # Redis-based real-time updates
            subscriber = None
            try:
                subscriber = await subscribe_to_workers()

                if
_get_progress_from_database function · python · L5683-L5743 (61 LOC)
api/admin.py
async def _get_progress_from_database(video_ids: Optional[str]) -> dict:
    """Get progress data from database for SSE fallback polling."""
    # Build query for active videos
    query = (
        sa.select(
            videos.c.id,
            videos.c.slug,
            videos.c.status,
            transcoding_jobs.c.id.label("job_id"),
            transcoding_jobs.c.current_step,
            transcoding_jobs.c.progress_percent,
            transcoding_jobs.c.last_error,
        )
        .select_from(videos.outerjoin(transcoding_jobs, videos.c.id == transcoding_jobs.c.video_id))
        .where(videos.c.status.in_(["pending", "processing"]))
        .where(videos.c.deleted_at.is_(None))
    )

    if video_ids:
        try:
            ids = [int(v.strip()) for v in video_ids.split(",") if v.strip()]
            query = query.where(videos.c.id.in_(ids))
        except ValueError:
            # If video_ids contains invalid values, ignore the filter and return all active videos
     
_get_workers_state function · python · L5746-L5848 (103 LOC)
api/admin.py
async def _get_workers_state() -> dict:
    """Get current workers state from database for SSE."""
    now = datetime.now(timezone.utc)
    offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)

    # Get all workers
    rows = await database.fetch_all(workers.select().order_by(workers.c.registered_at.desc()))

    workers_list = []
    stats = {"total": 0, "active": 0, "idle": 0, "offline": 0, "disabled": 0}

    for row in rows:
        stats["total"] += 1

        # Determine effective status
        status = row["status"]
        if status == "disabled":
            stats["disabled"] += 1
        elif status in ("active", "idle", "busy"):
            last_hb = row["last_heartbeat"]
            if last_hb and last_hb.replace(tzinfo=timezone.utc) < offline_threshold:
                status = "offline"
                stats["offline"] += 1
            elif status == "idle":
                stats["idle"] += 1
            else:
                stats["active"] += 
get_admin_watermark_settings function · python · L5858-L5889 (32 LOC)
api/admin.py
async def get_admin_watermark_settings(request: Request):
    """
    Get current watermark configuration.

    Returns the current watermark settings from database (with env var fallback).
    Supports both image and text watermark types.
    """
    # Get watermark settings from database with caching
    settings = await get_watermark_settings()

    watermark_exists = False
    if settings["image"]:
        watermark_path = NAS_STORAGE / settings["image"]
        watermark_exists = watermark_path.exists()

    return {
        "enabled": settings["enabled"],
        "type": settings["type"],
        # Image settings
        "image": settings["image"],
        "image_exists": watermark_exists,
        "image_url": "/api/settings/watermark/image" if watermark_exists else None,
        "max_width_percent": settings["max_width_percent"],
        # Text settings
        "text": settings["text"],
        "text_size": settings["text_size"],
        "text_color": settings["text_color"],
   
get_admin_watermark_image function · python · L5894-L5918 (25 LOC)
api/admin.py
async def get_admin_watermark_image(request: Request):
    """Serve the watermark image for admin preview."""
    # Get watermark settings from database with caching
    settings = await get_watermark_settings()

    if not settings["image"]:
        raise HTTPException(status_code=404, detail="No watermark configured")

    watermark_path = NAS_STORAGE / settings["image"]
    if not watermark_path.exists():
        raise HTTPException(status_code=404, detail="Watermark image not found")

    # Determine content type from extension
    ext = watermark_path.suffix.lower()
    content_types = {
        ".png": "image/png",
        ".jpg": "image/jpeg",
        ".jpeg": "image/jpeg",
        ".webp": "image/webp",
        ".svg": "image/svg+xml",
        ".gif": "image/gif",
    }
    content_type = content_types.get(ext, "application/octet-stream")

    return FileResponse(watermark_path, media_type=content_type)
upload_watermark_image function · python · L5923-L6012 (90 LOC)
api/admin.py
async def upload_watermark_image(
    request: Request,
    file: UploadFile = File(...),
):
    """
    Upload a new watermark image.

    The image is saved to the configured watermark path (VLOG_WATERMARK_IMAGE).
    If no path is configured, saves to 'watermark.png' in NAS_STORAGE.

    Accepts: PNG, JPEG, WebP, SVG, GIF (max 10MB)
    For best results, use a PNG with transparency.

    Note: After uploading, you must set VLOG_WATERMARK_ENABLED=true and
    restart the services for the watermark to appear.
    """
    # Validate file extension
    if not file.filename:
        raise HTTPException(status_code=400, detail="No filename provided")

    ext = Path(file.filename).suffix.lower()
    allowed_extensions = {".png", ".jpg", ".jpeg", ".webp", ".svg", ".gif"}
    if ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported image format. Allowed: {', '.join(sorted(allowed_extensions))}",
        )

    # Check file 
Repobility · code-quality intelligence platform · https://repobility.com
delete_watermark_image function · python · L6017-L6055 (39 LOC)
api/admin.py
async def delete_watermark_image(request: Request):
    """
    Delete the current watermark image.

    Removes the watermark file from storage. You should also set
    'watermark.enabled' to false in Settings to disable the watermark overlay.
    """
    # Get watermark settings from database with caching
    settings = await get_watermark_settings()

    if not settings["image"]:
        raise HTTPException(status_code=404, detail="No watermark configured")

    watermark_path = NAS_STORAGE / settings["image"]
    if not watermark_path.exists():
        raise HTTPException(status_code=404, detail="Watermark image not found")

    try:
        watermark_path.unlink()

        # Audit log
        log_audit(
            AuditAction.SETTINGS_CHANGE,
            client_ip=get_real_ip(request),
            user_agent=request.headers.get("user-agent"),
            resource_type="watermark",
            resource_id=None,
            resource_name=settings["image"],
            details={"act
validate_safe_path function · python · L6069-L6101 (33 LOC)
api/admin.py
def validate_safe_path(base: Path, user_path: str) -> Path:
    """
    Validate that a path stays within the base directory.

    Args:
        base: The base directory that the path must be contained within
        user_path: The user-provided path component

    Returns:
        The validated, resolved path

    Raises:
        HTTPException: If path traversal is detected
    """
    if not user_path:
        raise HTTPException(status_code=400, detail="Path cannot be empty")

    # Check for obvious path traversal attempts
    if ".." in user_path:
        logger.warning(f"Path traversal attempt blocked: {user_path}")
        raise HTTPException(status_code=400, detail="Invalid path")

    try:
        full_path = (base / user_path).resolve()
        base_resolved = base.resolve()

        # Verify the resolved path is within the base directory
        full_path.relative_to(base_resolved)

        return full_path
    except (ValueError, OSError) as e:
        logger.warning(f"Path
sanitize_svg function · python · L6104-L6151 (48 LOC)
api/admin.py
def sanitize_svg(content: bytes) -> bytes:
    """
    Sanitize SVG content by removing potentially dangerous elements and attributes.

    Removes:
    - <script> tags
    - Event handler attributes (onclick, onload, onerror, etc.)
    - javascript: URLs
    - data: URLs (can contain scripts)
    - <foreignObject> elements (can embed HTML)

    Args:
        content: Raw SVG file content

    Returns:
        Sanitized SVG content
    """
    try:
        # Decode to string for regex processing
        svg_str = content.decode("utf-8")

        # Remove script tags and their content
        svg_str = re.sub(r"<script[^>]*>.*?</script>", "", svg_str, flags=re.IGNORECASE | re.DOTALL)
        svg_str = re.sub(r"<script[^>]*/>", "", svg_str, flags=re.IGNORECASE)

        # Remove foreignObject elements (can embed arbitrary HTML)
        svg_str = re.sub(r"<foreignObject[^>]*>.*?</foreignObject>", "", svg_str, flags=re.IGNORECASE | re.DOTALL)
        svg_str = re.sub(r"<foreignObject[^>]*/
sanitize_custom_css function · python · L6154-L6235 (82 LOC)
api/admin.py
def sanitize_custom_css(css: str) -> str:
    """
    Sanitize custom CSS to only allow CSS variable declarations.

    This prevents CSS injection attacks by restricting custom CSS to:
    - CSS custom property (variable) declarations
    - Only in :root or body selectors

    Blocks:
    - url() values (prevent data exfiltration)
    - @import rules
    - expression() (IE-specific)
    - javascript: URLs
    - Arbitrary selectors (only :root and body allowed)

    Args:
        css: The custom CSS string to sanitize

    Returns:
        Sanitized CSS containing only safe variable declarations
    """
    if not css or not css.strip():
        return ""

    # Remove comments (both block and potentially dangerous constructs)
    css = re.sub(r"/\*.*?\*/", "", css, flags=re.DOTALL)

    # Block dangerous patterns
    dangerous_patterns = [
        r"@import\b",  # @import rules
        r"expression\s*\(",  # IE expression()
        r"javascript\s*:",  # javascript: URLs
        r"beha
validate_footer_links function · python · L6238-L6303 (66 LOC)
api/admin.py
def validate_footer_links(links: Any) -> list:
    """
    Validate and sanitize footer links to prevent XSS via javascript: URLs.

    Args:
        links: The footer links data (should be a list of {label, url} objects)

    Returns:
        Validated list of footer links

    Raises:
        HTTPException: If links contain invalid or dangerous URLs
    """
    if links is None:
        return []

    if not isinstance(links, list):
        raise HTTPException(status_code=400, detail="Footer links must be a list")

    validated = []
    dangerous_protocols = ["javascript:", "data:", "vbscript:", "file:"]

    for i, link in enumerate(links):
        if not isinstance(link, dict):
            raise HTTPException(status_code=400, detail=f"Footer link {i} must be an object")

        label = link.get("label", "").strip()
        url = link.get("url", "").strip()

        if not label:
            raise HTTPException(status_code=400, detail=f"Footer link {i} must have a label")

       
get_branding_settings function · python · L6306-L6352 (47 LOC)
api/admin.py
async def get_branding_settings() -> Dict[str, Any]:
    """Get branding settings from database with caching and proper locking."""
    import time

    global _cached_branding_settings, _cached_branding_settings_time

    now = time.time()

    # Fast path: check cache without lock
    if _cached_branding_settings and (now - _cached_branding_settings_time) < _BRANDING_SETTINGS_CACHE_TTL:
        return _cached_branding_settings

    # Slow path: acquire lock and refresh cache
    async with _branding_cache_lock:
        # Double-check after acquiring lock
        now = time.time()
        if _cached_branding_settings and (now - _cached_branding_settings_time) < _BRANDING_SETTINGS_CACHE_TTL:
            return _cached_branding_settings

        try:
            from api.settings_service import get_settings_service

            service = get_settings_service()

            settings = {
                "site_name": await service.get("branding.site_name", "VLog"),
                "logo_pa
get_admin_branding_settings function · python · L6357-L6385 (29 LOC)
api/admin.py
async def get_admin_branding_settings(request: Request):
    """
    Get current branding configuration.

    Returns the current branding settings from database.
    """
    settings = await get_branding_settings()

    logo_exists = False
    if settings["logo_path"]:
        logo_file = NAS_STORAGE / settings["logo_path"]
        logo_exists = logo_file.exists()

    favicon_exists = False
    if settings["favicon_path"]:
        favicon_file = NAS_STORAGE / settings["favicon_path"]
        favicon_exists = favicon_file.exists()

    return {
        "site_name": settings["site_name"],
        "logo_path": settings["logo_path"],
        "logo_exists": logo_exists,
        "logo_url": "/api/v1/settings/branding/logo" if logo_exists else None,
        "favicon_path": settings["favicon_path"],
        "favicon_exists": favicon_exists,
        "favicon_url": "/api/v1/settings/branding/favicon" if favicon_exists else None,
        "footer_text": settings["footer_text"],
        "footer_l
get_admin_logo_image function · python · L6390-L6420 (31 LOC)
api/admin.py
async def get_admin_logo_image(request: Request):
    """Serve the logo image for admin preview."""
    settings = await get_branding_settings()

    if not settings["logo_path"]:
        raise HTTPException(status_code=404, detail="No logo configured")

    # Validate path is within storage directory (prevent path traversal)
    logo_path = validate_safe_path(NAS_STORAGE, settings["logo_path"])
    if not logo_path.exists():
        raise HTTPException(status_code=404, detail="Logo image not found")

    ext = logo_path.suffix.lower()
    content_types = {
        ".png": "image/png",
        ".jpg": "image/jpeg",
        ".jpeg": "image/jpeg",
        ".webp": "image/webp",
        ".svg": "image/svg+xml",
        ".gif": "image/gif",
    }
    content_type = content_types.get(ext, "application/octet-stream")

    return FileResponse(
        logo_path,
        media_type=content_type,
        headers={
            "X-Content-Type-Options": "nosniff",
            "Cache-Control": "pr
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
upload_logo_image function · python · L6425-L6543 (119 LOC)
api/admin.py
async def upload_logo_image(
    request: Request,
    file: UploadFile = File(...),
):
    """
    Upload a new logo image.

    Accepts: PNG, JPEG, WebP, SVG, GIF (max 10MB)
    For best results, use a PNG or SVG with transparency.
    SVG files are automatically sanitized to remove scripts.
    """
    if not file.filename:
        raise HTTPException(status_code=400, detail="No filename provided")

    ext = Path(file.filename).suffix.lower()
    allowed_extensions = {".png", ".jpg", ".jpeg", ".webp", ".svg", ".gif"}
    if ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported image format. Allowed: {', '.join(sorted(allowed_extensions))}",
        )

    content_length = request.headers.get("content-length")
    if content_length and int(content_length) > MAX_THUMBNAIL_UPLOAD_SIZE:
        raise HTTPException(
            status_code=413,
            detail=f"File too large. Maximum size: {MAX_THUMBNAIL_UPLOAD_SIZ
delete_logo_image function · python · L6548-L6590 (43 LOC)
api/admin.py
async def delete_logo_image(request: Request):
    """Delete the current logo image."""
    settings = await get_branding_settings()

    if not settings["logo_path"]:
        raise HTTPException(status_code=404, detail="No logo configured")

    # Validate path to prevent traversal attacks
    logo_path = validate_safe_path(NAS_STORAGE, settings["logo_path"])
    if not logo_path.exists():
        raise HTTPException(status_code=404, detail="Logo image not found")

    try:
        logo_path.unlink()

        log_audit(
            AuditAction.SETTINGS_CHANGE,
            client_ip=get_real_ip(request),
            user_agent=request.headers.get("user-agent"),
            resource_type="branding",
            resource_id=None,
            resource_name=settings["logo_path"],
            details={"action": "logo_delete"},
        )

        # Clear the setting
        from api.settings_service import get_settings_service

        service = get_settings_service()
        await service.s
get_admin_favicon function · python · L6595-L6622 (28 LOC)
api/admin.py
async def get_admin_favicon(request: Request):
    """Serve the favicon for admin preview."""
    settings = await get_branding_settings()

    if not settings["favicon_path"]:
        raise HTTPException(status_code=404, detail="No favicon configured")

    # Validate path to prevent traversal attacks
    favicon_path = validate_safe_path(NAS_STORAGE, settings["favicon_path"])
    if not favicon_path.exists():
        raise HTTPException(status_code=404, detail="Favicon not found")

    ext = favicon_path.suffix.lower()
    content_types = {
        ".ico": "image/x-icon",
        ".png": "image/png",
        ".svg": "image/svg+xml",
    }
    content_type = content_types.get(ext, "application/octet-stream")

    return FileResponse(
        favicon_path,
        media_type=content_type,
        headers={
            "X-Content-Type-Options": "nosniff",
            "Cache-Control": "private, max-age=3600",
        },
    )
‹ prevpage 2 / 20next ›