← back to filthyrake__vlog

Function bodies 1,000 total

All specs Real LLM only Function bodies
upload_favicon function · python · L6627-L6735 (109 LOC)
api/admin.py
async def upload_favicon(
    request: Request,
    file: UploadFile = File(...),
):
    """
    Upload a new favicon.

    Accepts: ICO, PNG, SVG (max 1MB)
    For best results, use a square image (32x32 or 64x64 for ICO, any size for SVG).
    """
    if not file.filename:
        raise HTTPException(status_code=400, detail="No filename provided")

    ext = Path(file.filename).suffix.lower()
    allowed_extensions = {".ico", ".png", ".svg"}
    if ext not in allowed_extensions:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported favicon format. Allowed: {', '.join(sorted(allowed_extensions))}",
        )

    max_favicon_size = 1 * 1024 * 1024  # 1MB
    content_length = request.headers.get("content-length")
    if content_length and int(content_length) > max_favicon_size:
        raise HTTPException(
            status_code=413,
            detail=f"File too large. Maximum size: {max_favicon_size // (1024 * 1024)}MB",
        )

    settings =
delete_favicon function · python · L6740-L6782 (43 LOC)
api/admin.py
async def delete_favicon(request: Request):
    """Delete the current favicon."""
    settings = await get_branding_settings()

    if not settings["favicon_path"]:
        raise HTTPException(status_code=404, detail="No favicon configured")

    # Validate path to prevent traversal attacks
    favicon_path = validate_safe_path(NAS_STORAGE, settings["favicon_path"])
    if not favicon_path.exists():
        raise HTTPException(status_code=404, detail="Favicon not found")

    try:
        favicon_path.unlink()

        log_audit(
            AuditAction.SETTINGS_CHANGE,
            client_ip=get_real_ip(request),
            user_agent=request.headers.get("user-agent"),
            resource_type="branding",
            resource_id=None,
            resource_name=settings["favicon_path"],
            details={"action": "favicon_delete"},
        )

        # Clear the setting
        from api.settings_service import get_settings_service

        service = get_settings_service()
        
list_settings function · python · L6793-L6820 (28 LOC)
api/admin.py
async def list_settings(request: Request) -> SettingsByCategoryResponse:
    """
    List all settings grouped by category.

    Returns settings from the database, grouped by their category
    for easy display in the admin UI.
    """
    service = get_settings_service()
    all_settings = await service.get_all()

    # Convert to response format
    categories_dict = {}
    for category, settings_list in all_settings.items():
        categories_dict[category] = [
            SettingResponse(
                key=s["key"],
                value=s["value"],
                category=category,
                value_type=s["value_type"],
                description=s["description"],
                constraints=s.get("constraints"),
                updated_at=s["updated_at"],
                updated_by=s.get("updated_by"),
            )
            for s in settings_list
        ]

    return SettingsByCategoryResponse(categories=categories_dict)
list_settings_categories function · python · L6825-L6833 (9 LOC)
api/admin.py
async def list_settings_categories(request: Request) -> List[str]:
    """
    List all setting categories.

    Returns a list of unique category names for building
    navigation in the admin UI.
    """
    service = get_settings_service()
    return await service.get_categories()
get_settings_by_category function · python · L6838-L6867 (30 LOC)
api/admin.py
async def get_settings_by_category(request: Request, category: str) -> SettingsCategoryResponse:
    """
    Get all settings in a specific category.

    Returns settings filtered by the specified category,
    useful for displaying a single tab in the settings UI.
    """
    service = get_settings_service()
    settings_list = await service.get_category(category)

    if not settings_list:
        # Return empty category rather than 404 for consistent UI behavior
        return SettingsCategoryResponse(category=category, settings=[])

    return SettingsCategoryResponse(
        category=category,
        settings=[
            SettingResponse(
                key=s["key"],
                value=s["value"],
                category=category,
                value_type=s["value_type"],
                description=s["description"],
                constraints=s.get("constraints"),
                updated_at=s["updated_at"],
                updated_by=s.get("updated_by"),
            )
get_setting function · python · L6872-L6893 (22 LOC)
api/admin.py
async def get_setting(request: Request, key: str) -> SettingResponse:
    """
    Get a single setting by key.

    The key uses dot notation (e.g., "transcoding.hls_segment_duration").
    """
    service = get_settings_service()
    setting = await service.get_single(key)

    if setting is None:
        raise HTTPException(status_code=404, detail=f"Setting not found: {key}")

    return SettingResponse(
        key=setting["key"],
        value=setting["value"],
        category=setting["category"],
        value_type=setting["value_type"],
        description=setting["description"],
        constraints=setting.get("constraints"),
        updated_at=setting["updated_at"],
        updated_by=setting.get("updated_by"),
    )
update_setting function · python · L6898-L6994 (97 LOC)
api/admin.py
async def update_setting(request: Request, key: str, data: SettingUpdate) -> SettingResponse:
    """
    Update a setting value.

    Validates the value against the setting's type and constraints
    before saving. The change is reflected immediately in the cache.
    If the setting doesn't exist but is in KNOWN_SETTINGS, it will be created.
    """
    from api.settings_service import KNOWN_SETTINGS

    # Sanitize custom CSS to only allow CSS variable declarations (security)
    if key == "theme.custom_css" and data.value:
        if isinstance(data.value, str):
            data.value = sanitize_custom_css(data.value)

    # Validate footer links to prevent javascript: URLs (XSS prevention)
    if key == "branding.footer_links":
        data.value = validate_footer_links(data.value)

    service = get_settings_service()

    # Verify setting exists
    existing = await service.get_single(key)
    if existing is None:
        # Check if it's a known setting and create it
        kno
If a scraper extracted this row, it came from Repobility (https://repobility.com)
create_setting function · python · L6999-L7053 (55 LOC)
api/admin.py
async def create_setting(request: Request, data: SettingCreate) -> SettingResponse:
    """
    Create a new setting.

    Settings are created with a unique key in dot notation
    (e.g., "transcoding.hls_segment_duration").
    """
    service = get_settings_service()

    # Check if already exists
    existing = await service.get_single(data.key)
    if existing is not None:
        raise HTTPException(status_code=409, detail=f"Setting already exists: {data.key}")

    try:
        await service.create(
            key=data.key,
            value=data.value,
            category=data.category,
            value_type=data.value_type,
            description=data.description,
            constraints=data.constraints.model_dump() if data.constraints else None,
            updated_by="admin",
        )
    except SettingsValidationError as e:
        logger.warning(f"Settings validation error for key {data.key}: {e}")
        raise HTTPException(status_code=400, detail=sanitize_error_me
delete_setting function · python · L7058-L7085 (28 LOC)
api/admin.py
async def delete_setting(request: Request, key: str):
    """
    Delete a setting.

    Removes the setting from the database. This should be used
    with caution as it may affect application behavior.
    """
    service = get_settings_service()

    # Verify setting exists
    existing = await service.get_single(key)
    if existing is None:
        raise HTTPException(status_code=404, detail=f"Setting not found: {key}")

    await service.delete(key)

    # Audit log
    log_audit(
        AuditAction.SETTINGS_CHANGE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        resource_type="setting",
        resource_id=None,
        resource_name=key,
        details={"action": "delete", "old_value": existing["value"]},
    )

    return {"status": "ok", "message": f"Setting deleted: {key}"}
export_settings function · python · L7090-L7120 (31 LOC)
api/admin.py
async def export_settings(request: Request) -> SettingsExport:
    """
    Export all settings as JSON.

    Returns all settings in a format suitable for backup
    or transferring between environments.
    """
    service = get_settings_service()
    all_settings = await service.get_all()

    settings_list = []
    for category, cat_settings in all_settings.items():
        for s in cat_settings:
            settings_list.append(
                SettingResponse(
                    key=s["key"],
                    value=s["value"],
                    category=category,
                    value_type=s["value_type"],
                    description=s["description"],
                    constraints=s.get("constraints"),
                    updated_at=s["updated_at"],
                    updated_by=s.get("updated_by"),
                )
            )

    return SettingsExport(
        version="1.0",
        exported_at=datetime.now(timezone.utc),
        settings=settings_list,
    
import_settings function · python · L7125-L7182 (58 LOC)
api/admin.py
async def import_settings(request: Request, data: SettingsImport):
    """
    Import settings from JSON.

    Imports settings from an export file. By default, existing
    settings are skipped. Use overwrite=true to replace existing values.
    """
    service = get_settings_service()
    results = {"created": 0, "updated": 0, "skipped": 0, "errors": []}

    for setting in data.settings:
        try:
            existing = await service.get_single(setting.key)

            if existing is not None:
                if data.overwrite:
                    await service.set(setting.key, setting.value, updated_by="import")
                    results["updated"] += 1
                else:
                    results["skipped"] += 1
            else:
                await service.create(
                    key=setting.key,
                    value=setting.value,
                    category=setting.category,
                    value_type=setting.value_type,
                    descriptio
invalidate_settings_cache function · python · L7187-L7197 (11 LOC)
api/admin.py
async def invalidate_settings_cache(request: Request):
    """
    Invalidate the settings cache.

    Forces the next settings read to fetch fresh data from the database.
    Useful when settings have been modified directly in the database.
    """
    service = get_settings_service()
    service.invalidate_cache()

    return {"status": "ok", "message": "Settings cache invalidated"}
get_settings_cache_stats function · python · L7202-L7209 (8 LOC)
api/admin.py
async def get_settings_cache_stats(request: Request):
    """
    Get settings cache statistics.

    Returns information about the current state of the settings cache.
    """
    service = get_settings_service()
    return service.get_cache_stats()
seed_settings_from_environment function · python · L7214-L7233 (20 LOC)
api/admin.py
async def seed_settings_from_environment(request: Request):
    """
    Seed database settings from environment variables.

    This endpoint reads known settings from environment variables and
    creates them in the database if they don't already exist. Useful for
    initial migration from env-var-based configuration.

    Settings that already exist in the database are skipped.
    Environment variables that are not set are also skipped.

    Returns a summary of seeded, skipped, and any errors.
    """
    results = await seed_settings_from_env(updated_by="admin")
    return {
        "status": "ok",
        "seeded": results["seeded"],
        "skipped": results["skipped"],
        "details": results["details"],
    }
get_reencode_status function · python · L7243-L7282 (40 LOC)
api/admin.py
async def get_reencode_status(request: Request):
    """
    Get the status of the re-encode queue.

    Returns statistics about pending, in-progress, completed, and failed jobs.
    """
    query = sa.text("""
        SELECT
            status,
            COUNT(*) as count
        FROM reencode_queue
        GROUP BY status
    """)
    rows = await fetch_all_with_retry(query)

    stats = {
        "pending": 0,
        "in_progress": 0,
        "completed": 0,
        "failed": 0,
        "cancelled": 0,
    }
    for row in rows:
        stats[row["status"]] = row["count"]

    # Get legacy video count (videos still on hls_ts format)
    legacy_query = sa.text("""
        SELECT COUNT(*) as count FROM videos
        WHERE status = 'ready'
        AND deleted_at IS NULL
        AND (streaming_format = 'hls_ts' OR streaming_format IS NULL)
    """)
    legacy_result = await fetch_one_with_retry(legacy_query)
    legacy_count = legacy_result["count"] if legacy_result else 0

    ret
Powered by Repobility — scan your code at https://repobility.com
queue_videos_for_reencode function · python · L7287-L7350 (64 LOC)
api/admin.py
async def queue_videos_for_reencode(
    request: Request,
    video_ids: List[int] = Query(..., description="Video IDs to queue"),
    priority: str = Query("normal", regex="^(high|normal|low)$"),
    target_format: str = Query("cmaf", regex="^(hls_ts|cmaf)$"),
    target_codec: str = Query("hevc", regex="^(h264|hevc|av1)$"),
):
    """
    Queue specific videos for re-encoding.

    Args:
        video_ids: List of video IDs to queue
        priority: Queue priority (high, normal, low)
        target_format: Target streaming format
        target_codec: Target video codec
    """
    queued = []
    skipped = []
    errors = []

    for video_id in video_ids:
        # Check if video exists and is ready
        video = await fetch_one_with_retry(
            sa.text("SELECT id, status, streaming_format FROM videos WHERE id = :id").bindparams(id=video_id)
        )

        if not video:
            errors.append({"video_id": video_id, "error": "Video not found"})
            continue
queue_all_legacy_videos function · python · L7355-L7410 (56 LOC)
api/admin.py
async def queue_all_legacy_videos(
    request: Request,
    priority: str = Query("low", regex="^(high|normal|low)$"),
    target_format: str = Query("cmaf", regex="^(hls_ts|cmaf)$"),
    target_codec: str = Query("hevc", regex="^(h264|hevc|av1)$"),
    batch_size: int = Query(100, ge=1, le=1000),
):
    """
    Queue all legacy (HLS/TS) videos for re-encoding to CMAF.

    This will queue videos that are:
    - Status is 'ready'
    - Not deleted
    - Using hls_ts format (or NULL for legacy videos)
    - Not already pending in the queue

    Uses batch insert for efficiency with large numbers of videos.
    """
    # Find legacy videos not already queued
    query = sa.text("""
        SELECT v.id
        FROM videos v
        LEFT JOIN reencode_queue rq ON v.id = rq.video_id AND rq.status = 'pending'
        WHERE v.status = 'ready'
        AND v.deleted_at IS NULL
        AND (v.streaming_format = 'hls_ts' OR v.streaming_format IS NULL)
        AND rq.id IS NULL
        LIMIT :lim
list_reencode_jobs function · python · L7415-L7482 (68 LOC)
api/admin.py
async def list_reencode_jobs(
    request: Request,
    status: Optional[str] = Query(None, regex="^(pending|in_progress|completed|failed|cancelled)$"),
    limit: int = Query(50, ge=1, le=500),
    offset: int = Query(0, ge=0),
):
    """
    List re-encode queue jobs with optional status filter.

    Returns paginated results with total count for UI pagination.
    """
    # Build WHERE clause
    where_clause = ""
    count_params = {}
    if status:
        where_clause = " WHERE rq.status = :status"
        count_params["status"] = status

    # Get total count for pagination
    count_query_str = f"""
        SELECT COUNT(*) as total
        FROM reencode_queue rq
        {where_clause}
    """
    if count_params:
        count_query = sa.text(count_query_str).bindparams(**count_params)
    else:
        count_query = sa.text(count_query_str)
    count_result = await fetch_one_with_retry(count_query)
    total = count_result["total"] if count_result else 0

    # Get paginated r
cancel_reencode_job function · python · L7487-L7509 (23 LOC)
api/admin.py
async def cancel_reencode_job(request: Request, job_id: int):
    """
    Cancel a pending re-encode job.

    Only pending jobs can be cancelled. In-progress jobs will complete.
    """
    # Check current status
    job = await fetch_one_with_retry(
        sa.text("SELECT id, status FROM reencode_queue WHERE id = :id").bindparams(id=job_id)
    )

    if not job:
        raise HTTPException(status_code=404, detail="Job not found")

    if job["status"] != "pending":
        raise HTTPException(status_code=400, detail=f"Cannot cancel job with status '{job['status']}'")

    await db_execute_with_retry(
        database,
        reencode_queue.update().where(reencode_queue.c.id == job_id).values(status="cancelled"),
    )

    return {"status": "ok", "message": f"Job {job_id} cancelled"}
claim_reencode_job function · python · L7514-L7567 (54 LOC)
api/admin.py
async def claim_reencode_job(request: Request):
    """
    Claim the next available re-encode job.

    This is called by re-encode workers to get work.
    Returns the job details if one is available, or null if queue is empty.

    Uses atomic UPDATE with RETURNING to prevent race conditions when
    multiple workers try to claim jobs simultaneously.
    """
    # Verify worker API key using shared helper (supports both argon2 and SHA-256)
    api_key = request.headers.get("X-Worker-API-Key")
    if not api_key:
        raise HTTPException(status_code=401, detail="Worker API key required")

    # authenticate_api_key raises HTTPException(401) if invalid
    await authenticate_api_key(api_key, request)

    # Atomic claim using UPDATE with subquery and RETURNING
    # This prevents race conditions when multiple workers claim simultaneously
    claim_query = sa.text("""
        UPDATE reencode_queue
        SET status = 'in_progress',
            started_at = :started_at
        WHERE
update_reencode_job function · python · L7572-L7626 (55 LOC)
api/admin.py
async def update_reencode_job(
    request: Request,
    job_id: int,
):
    """
    Update a re-encode job status.

    Used by workers to mark jobs as completed/failed.
    Requires worker API key authentication.
    """
    # Verify worker API key using shared helper (supports both argon2 and SHA-256)
    api_key = request.headers.get("X-Worker-API-Key")
    if not api_key:
        raise HTTPException(status_code=401, detail="Worker API key required")

    # authenticate_api_key raises HTTPException(401) if invalid
    await authenticate_api_key(api_key, request)

    # Validate job exists
    job = await fetch_one_with_retry(
        sa.text("SELECT id, status FROM reencode_queue WHERE id = :id").bindparams(id=job_id)
    )
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")

    body = await request.json()
    status = body.get("status")
    error_message = body.get("error_message")
    retry_count = body.get("retry_count")

    # Validate status v
list_custom_fields function · python · L7634-L7716 (83 LOC)
api/admin.py
async def list_custom_fields(
    request: Request,
    category_id: Optional[int] = Query(
        default=None, description="Filter by category. Use 0 for global fields only, or omit for all fields."
    ),
) -> CustomFieldListResponse:
    """
    List custom field definitions.

    - No filter: Returns all fields (global + category-specific)
    - category_id=0: Returns only global fields
    - category_id=N: Returns global fields + fields for category N
    """
    query = (
        sa.select(
            custom_field_definitions.c.id,
            custom_field_definitions.c.name,
            custom_field_definitions.c.slug,
            custom_field_definitions.c.field_type,
            custom_field_definitions.c.options,
            custom_field_definitions.c.required,
            custom_field_definitions.c.category_id,
            custom_field_definitions.c.position,
            custom_field_definitions.c.constraints,
            custom_field_definitions.c.description,
          
create_custom_field function · python · L7721-L7825 (105 LOC)
api/admin.py
async def create_custom_field(
    request: Request,
    data: CustomFieldCreate,
) -> CustomFieldResponse:
    """Create a new custom field definition."""
    # Generate slug from name
    slug = slugify(data.name)

    # Validate category exists if provided
    category_name = None
    if data.category_id is not None:
        category = await fetch_one_with_retry(categories.select().where(categories.c.id == data.category_id))
        if not category:
            raise HTTPException(status_code=400, detail="Category not found")
        category_name = category["name"]

    # Check for duplicate slug in same scope
    duplicate_query = custom_field_definitions.select().where(custom_field_definitions.c.slug == slug)
    if data.category_id is not None:
        duplicate_query = duplicate_query.where(custom_field_definitions.c.category_id == data.category_id)
    else:
        duplicate_query = duplicate_query.where(custom_field_definitions.c.category_id.is_(None))

    existing = await 
All rows above produced by Repobility · https://repobility.com
get_custom_field function · python · L7830-L7887 (58 LOC)
api/admin.py
async def get_custom_field(
    request: Request,
    field_id: int,
) -> CustomFieldResponse:
    """Get a custom field definition by ID."""
    query = (
        sa.select(
            custom_field_definitions.c.id,
            custom_field_definitions.c.name,
            custom_field_definitions.c.slug,
            custom_field_definitions.c.field_type,
            custom_field_definitions.c.options,
            custom_field_definitions.c.required,
            custom_field_definitions.c.category_id,
            custom_field_definitions.c.position,
            custom_field_definitions.c.constraints,
            custom_field_definitions.c.description,
            custom_field_definitions.c.created_at,
            categories.c.name.label("category_name"),
        )
        .select_from(
            custom_field_definitions.outerjoin(categories, custom_field_definitions.c.category_id == categories.c.id)
        )
        .where(custom_field_definitions.c.id == field_id)
    )

    row =
update_custom_field function · python · L7892-L7999 (108 LOC)
api/admin.py
async def update_custom_field(
    request: Request,
    field_id: int,
    data: CustomFieldUpdate,
) -> CustomFieldResponse:
    """
    Update a custom field definition.

    Note: field_type and category_id cannot be changed after creation.
    """
    # Fetch existing field
    existing = await fetch_one_with_retry(
        custom_field_definitions.select().where(custom_field_definitions.c.id == field_id)
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Custom field not found")

    # Build update values
    update_values = {}

    if data.name is not None:
        # Generate new slug from new name
        new_slug = slugify(data.name)

        # Check for duplicate slug in same scope (exclude current field)
        duplicate_query = (
            custom_field_definitions.select()
            .where(custom_field_definitions.c.slug == new_slug)
            .where(custom_field_definitions.c.id != field_id)
        )
        if existing["category_id"] i
delete_custom_field function · python · L8004-L8039 (36 LOC)
api/admin.py
async def delete_custom_field(request: Request, field_id: int):
    """
    Delete a custom field definition.

    All values associated with this field will be deleted (CASCADE).
    """
    # Verify field exists
    existing = await fetch_one_with_retry(
        custom_field_definitions.select().where(custom_field_definitions.c.id == field_id)
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Custom field not found")

    # Count affected videos
    count_query = sa.select(sa.func.count()).where(video_custom_fields.c.field_id == field_id)
    affected_count = await fetch_val_with_retry(count_query)

    # Delete field (CASCADE will delete video_custom_fields entries)
    await db_execute_with_retry(custom_field_definitions.delete().where(custom_field_definitions.c.id == field_id))

    # Audit log
    log_audit(
        AuditAction.CUSTOM_FIELD_DELETE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        r
_validate_custom_field_value function · python · L8045-L8114 (70 LOC)
api/admin.py
def _validate_custom_field_value(field_type: str, value, options: list = None, constraints: dict = None):
    """Validate a value against a field type and constraints."""
    if value is None:
        return None  # None is always valid (clears the field)

    if field_type == "text":
        if not isinstance(value, str):
            raise ValueError("Value must be a string")
        if constraints:
            if constraints.get("min_length") and len(value) < constraints["min_length"]:
                raise ValueError(f"Value must be at least {constraints['min_length']} characters")
            if constraints.get("max_length") and len(value) > constraints["max_length"]:
                raise ValueError(f"Value must be at most {constraints['max_length']} characters")
            if constraints.get("pattern"):
                try:
                    if not re.match(constraints["pattern"], value):
                        raise ValueError("Value does not match required pattern")
       
get_video_custom_fields function · python · L8119-L8193 (75 LOC)
api/admin.py
async def get_video_custom_fields(
    request: Request,
    video_id: int,
) -> VideoCustomFieldsResponse:
    """
    Get custom field values for a video.

    Returns all applicable fields (global + category-specific) with their current values.
    Fields without values return null.
    """
    # Verify video exists and get its category
    video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get applicable field definitions (global + video's category)
    field_query = (
        sa.select(
            custom_field_definitions.c.id,
            custom_field_definitions.c.name,
            custom_field_definitions.c.slug,
            custom_field_definitions.c.field_type,
            custom_field_definitions.c.options,
            custom_field_definitions.c.required,
        )
        .where(
            sa.or_(
                custom_field_definitions.c.categ
set_video_custom_fields function · python · L8198-L8296 (99 LOC)
api/admin.py
async def set_video_custom_fields(
    request: Request,
    video_id: int,
    data: VideoCustomFieldsUpdate,
) -> VideoCustomFieldsResponse:
    """
    Set custom field values for a video.

    Values are validated against field type and constraints.
    Use null to clear a field value.
    """
    # Verify video exists and get its category
    video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Get field definitions for validation
    field_ids = list(data.values.keys())
    if not field_ids:
        return await get_video_custom_fields(request, video_id)

    field_query = custom_field_definitions.select().where(custom_field_definitions.c.id.in_(field_ids))
    field_rows = await fetch_all_with_retry(field_query)
    fields_map = {row["id"]: row for row in field_rows}

    # Validate each value
    errors = []
    for field_id, value in data.values.items()
bulk_update_custom_fields function · python · L8301-L8450 (150 LOC)
api/admin.py
async def bulk_update_custom_fields(
    request: Request,
    data: BulkCustomFieldsUpdate,
) -> BulkCustomFieldsResponse:
    """
    Update custom field values for multiple videos.

    The same field values are applied to all specified videos.
    """
    # Validate field definitions
    field_ids = list(data.values.keys())
    if not field_ids:
        raise HTTPException(status_code=400, detail="No field values provided")

    field_query = custom_field_definitions.select().where(custom_field_definitions.c.id.in_(field_ids))
    field_rows = await fetch_all_with_retry(field_query)
    fields_map = {row["id"]: row for row in field_rows}

    # Validate all fields exist
    missing = [fid for fid in field_ids if fid not in fields_map]
    if missing:
        raise HTTPException(status_code=400, detail=f"Fields not found: {missing}")

    # Validate values against field types
    for field_id, value in data.values.items():
        field = fields_map[field_id]
        try:
          
_build_playlist_response function · python · L8456-L8475 (20 LOC)
api/admin.py
def _build_playlist_response(row: dict, video_count: int = 0, total_duration: float = 0) -> PlaylistResponse:
    """Build a PlaylistResponse from a database row."""
    thumbnail_url = None
    if row.get("thumbnail_path"):
        thumbnail_url = f"{get_video_url_prefix()}/{row['thumbnail_path']}"

    return PlaylistResponse(
        id=row["id"],
        title=row["title"],
        slug=row["slug"],
        description=row.get("description"),
        thumbnail_url=thumbnail_url,
        visibility=row["visibility"],
        playlist_type=row["playlist_type"],
        is_featured=row["is_featured"],
        video_count=video_count,
        total_duration=total_duration,
        created_at=row["created_at"],
        updated_at=row.get("updated_at"),
    )
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
list_playlists function · python · L8480-L8510 (31 LOC)
api/admin.py
async def list_playlists(
    request: Request,
    limit: int = Query(default=50, ge=1, le=200),
    offset: int = Query(default=0, ge=0),
) -> PlaylistListResponse:
    """List all playlists with video counts (with pagination)."""
    # Get total count
    count_query = sa.text("""
        SELECT COUNT(*) FROM playlists WHERE deleted_at IS NULL
    """)
    total_count = await fetch_val_with_retry(count_query)

    # Get playlists with pagination
    query = sa.text("""
        SELECT
            p.*,
            COUNT(DISTINCT pi.video_id) as video_count,
            COALESCE(SUM(v.duration), 0) as total_duration
        FROM playlists p
        LEFT JOIN playlist_items pi ON pi.playlist_id = p.id
        LEFT JOIN videos v ON v.id = pi.video_id AND v.deleted_at IS NULL
        WHERE p.deleted_at IS NULL
        GROUP BY p.id
        ORDER BY p.created_at DESC
        LIMIT :limit OFFSET :offset
    """).bindparams(limit=limit, offset=offset)
    rows = await fetch_all_with_retry(qu
create_playlist function · python · L8515-L8567 (53 LOC)
api/admin.py
async def create_playlist(request: Request, data: PlaylistCreate) -> PlaylistResponse:
    """Create a new playlist."""
    slug = slugify(data.title)

    # Check for duplicate slug
    existing = await fetch_one_with_retry(
        playlists.select().where(playlists.c.slug == slug).where(playlists.c.deleted_at.is_(None))
    )
    if existing:
        raise HTTPException(status_code=400, detail="Playlist with this title already exists")

    now = datetime.now(timezone.utc)
    query = playlists.insert().values(
        title=data.title,
        slug=slug,
        description=data.description,
        visibility=data.visibility,
        playlist_type=data.playlist_type,
        is_featured=data.is_featured,
        created_at=now,
        updated_at=now,
    )
    playlist_id = await db_execute_with_retry(query)

    # Audit log
    log_audit(
        AuditAction.PLAYLIST_CREATE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        res
get_playlist function · python · L8572-L8630 (59 LOC)
api/admin.py
async def get_playlist(request: Request, playlist_id: int) -> PlaylistDetailResponse:
    """Get a playlist with its videos."""
    # Get playlist
    playlist = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not playlist:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Get videos in playlist
    video_query = sa.text("""
        SELECT
            v.id, v.title, v.slug, v.duration, v.status,
            pi.position
        FROM playlist_items pi
        JOIN videos v ON v.id = pi.video_id
        WHERE pi.playlist_id = :playlist_id AND v.deleted_at IS NULL
        ORDER BY pi.position ASC
    """).bindparams(playlist_id=playlist_id)
    video_rows = await fetch_all_with_retry(video_query)

    # Build video list
    video_list = []
    total_duration = 0.0
    for vrow in video_rows:
        thumbnail_url = f"{get_video_url_prefix()}/videos/{vrow['slug']}/
update_playlist function · python · L8635-L8719 (85 LOC)
api/admin.py
async def update_playlist(request: Request, playlist_id: int, data: PlaylistUpdate) -> PlaylistResponse:
    """Update a playlist."""
    # Verify playlist exists
    existing = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Build update values
    update_values = {"updated_at": datetime.now(timezone.utc)}
    changes = {}

    if data.title is not None:
        new_slug = slugify(data.title)
        # Check for duplicate slug
        duplicate = await fetch_one_with_retry(
            playlists.select()
            .where(playlists.c.slug == new_slug)
            .where(playlists.c.id != playlist_id)
            .where(playlists.c.deleted_at.is_(None))
        )
        if duplicate:
            raise HTTPException(status_code=400, detail="Playlist with this title already exists")
        update
delete_playlist function · python · L8724-L8749 (26 LOC)
api/admin.py
async def delete_playlist(request: Request, playlist_id: int):
    """Soft delete a playlist."""
    # Verify playlist exists
    existing = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not existing:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Soft delete
    await db_execute_with_retry(
        playlists.update().where(playlists.c.id == playlist_id).values(deleted_at=datetime.now(timezone.utc))
    )

    # Audit log
    log_audit(
        AuditAction.PLAYLIST_DELETE,
        client_ip=get_real_ip(request),
        user_agent=request.headers.get("user-agent"),
        resource_type="playlist",
        resource_id=playlist_id,
        resource_name=existing["slug"],
        details={"title": existing["title"]},
    )

    return {"status": "ok"}
get_playlist_videos function · python · L8754-L8786 (33 LOC)
api/admin.py
async def get_playlist_videos(request: Request, playlist_id: int) -> List[PlaylistVideoInfo]:
    """Get all videos in a playlist ordered by position."""
    # Verify playlist exists
    playlist = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not playlist:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Get videos
    query = sa.text("""
        SELECT
            v.id, v.title, v.slug, v.duration, v.status,
            pi.position
        FROM playlist_items pi
        JOIN videos v ON v.id = pi.video_id
        WHERE pi.playlist_id = :playlist_id AND v.deleted_at IS NULL
        ORDER BY pi.position ASC
    """).bindparams(playlist_id=playlist_id)
    rows = await fetch_all_with_retry(query)

    return [
        PlaylistVideoInfo(
            id=row["id"],
            title=row["title"],
            slug=row["slug"],
            thumbnail_url=f"{get_vi
add_video_to_playlist function · python · L8791-L8868 (78 LOC)
api/admin.py
async def add_video_to_playlist(request: Request, playlist_id: int, data: AddVideoToPlaylistRequest):
    """Add a video to a playlist."""
    # Verify playlist exists
    playlist = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not playlist:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Verify video exists
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == data.video_id).where(videos.c.deleted_at.is_(None))
    )
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Check if video already in playlist
    existing = await fetch_one_with_retry(
        playlist_items.select()
        .where(playlist_items.c.playlist_id == playlist_id)
        .where(playlist_items.c.video_id == data.video_id)
    )
    if existing:
        raise HTTPException(status_code=400, detail="Video already
remove_video_from_playlist function · python · L8873-L8926 (54 LOC)
api/admin.py
async def remove_video_from_playlist(request: Request, playlist_id: int, video_id: int):
    """Remove a video from a playlist."""
    # Verify playlist exists
    playlist = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not playlist:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Get the item to remove
    item = await fetch_one_with_retry(
        playlist_items.select()
        .where(playlist_items.c.playlist_id == playlist_id)
        .where(playlist_items.c.video_id == video_id)
    )
    if not item:
        raise HTTPException(status_code=404, detail="Video not in playlist")

    removed_position = item["position"]

    async with database.transaction():
        # Delete the item
        await database.execute(
            playlist_items.delete()
            .where(playlist_items.c.playlist_id == playlist_id)
            .where(playlist_items.c.vi
If a scraper extracted this row, it came from Repobility (https://repobility.com)
reorder_playlist function · python · L8931-L8995 (65 LOC)
api/admin.py
async def reorder_playlist(request: Request, playlist_id: int, data: ReorderPlaylistRequest):
    """Reorder videos in a playlist."""
    # Verify playlist exists
    playlist = await fetch_one_with_retry(
        playlists.select().where(playlists.c.id == playlist_id).where(playlists.c.deleted_at.is_(None))
    )
    if not playlist:
        raise HTTPException(status_code=404, detail="Playlist not found")

    # Get current items
    current_items = await fetch_all_with_retry(
        playlist_items.select().where(playlist_items.c.playlist_id == playlist_id)
    )
    current_video_ids = {item["video_id"] for item in current_items}

    # Validate all video IDs are in the playlist
    requested_ids = set(data.video_ids)
    if requested_ids != current_video_ids:
        missing = current_video_ids - requested_ids
        extra = requested_ids - current_video_ids
        errors = []
        if missing:
            errors.append(f"Missing video IDs: {sorted(missing)}")
        if extra
list_chapters function · python · L9003-L9036 (34 LOC)
api/admin.py
async def list_chapters(request: Request, video_id: int):
    """List all chapters for a video, ordered by position."""
    # Verify video exists
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == video_id).where(videos.c.deleted_at.is_(None))
    )
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Fetch chapters ordered by position
    chapter_rows = await fetch_all_with_retry(
        chapters.select().where(chapters.c.video_id == video_id).order_by(chapters.c.position)
    )

    chapter_list = [
        ChapterResponse(
            id=row["id"],
            video_id=row["video_id"],
            title=row["title"],
            description=row["description"],
            start_time=row["start_time"],
            end_time=row["end_time"],
            position=row["position"],
            created_at=row["created_at"],
            updated_at=row["updated_at"],
        )
        for row in chapter_rows
    ]
create_chapter function · python · L9041-L9119 (79 LOC)
api/admin.py
async def create_chapter(request: Request, video_id: int, data: ChapterCreate):
    """Create a new chapter for a video."""
    # Verify video exists
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == video_id).where(videos.c.deleted_at.is_(None))
    )
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    # Validate start_time against video duration
    if video["duration"] and data.start_time >= video["duration"]:
        raise HTTPException(
            status_code=400,
            detail=f"start_time ({data.start_time}s) must be less than video duration ({video['duration']}s)",
        )

    # Check chapter limit
    chapter_count = await fetch_val_with_retry(
        sa.select(sa.func.count()).select_from(chapters).where(chapters.c.video_id == video_id)
    )
    if chapter_count >= MAX_CHAPTERS_PER_VIDEO:
        raise HTTPException(status_code=400, detail=f"Maximum chapters per video ({MAX_CHAPTERS_PER_VI
get_chapter function · python · L9124-L9142 (19 LOC)
api/admin.py
async def get_chapter(request: Request, video_id: int, chapter_id: int):
    """Get a specific chapter."""
    chapter = await fetch_one_with_retry(
        chapters.select().where(chapters.c.id == chapter_id).where(chapters.c.video_id == video_id)
    )
    if not chapter:
        raise HTTPException(status_code=404, detail="Chapter not found")

    return ChapterResponse(
        id=chapter["id"],
        video_id=chapter["video_id"],
        title=chapter["title"],
        description=chapter["description"],
        start_time=chapter["start_time"],
        end_time=chapter["end_time"],
        position=chapter["position"],
        created_at=chapter["created_at"],
        updated_at=chapter["updated_at"],
    )
update_chapter function · python · L9147-L9205 (59 LOC)
api/admin.py
async def update_chapter(request: Request, video_id: int, chapter_id: int, data: ChapterUpdate):
    """Update an existing chapter."""
    # Verify chapter exists
    chapter = await fetch_one_with_retry(
        chapters.select().where(chapters.c.id == chapter_id).where(chapters.c.video_id == video_id)
    )
    if not chapter:
        raise HTTPException(status_code=404, detail="Chapter not found")

    # Build update values
    update_values = {"updated_at": datetime.now(timezone.utc)}
    if data.title is not None:
        update_values["title"] = data.title
    if data.description is not None:
        update_values["description"] = data.description
    if data.start_time is not None:
        # Validate against video duration
        video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
        if video["duration"] and data.start_time >= video["duration"]:
            raise HTTPException(
                status_code=400,
                detail=f"start_t
delete_chapter function · python · L9210-L9255 (46 LOC)
api/admin.py
async def delete_chapter(request: Request, video_id: int, chapter_id: int):
    """Delete a chapter and reorder remaining chapters."""
    # Verify chapter exists
    chapter = await fetch_one_with_retry(
        chapters.select().where(chapters.c.id == chapter_id).where(chapters.c.video_id == video_id)
    )
    if not chapter:
        raise HTTPException(status_code=404, detail="Chapter not found")

    removed_position = chapter["position"]

    async with database.transaction():
        # Delete the chapter
        await database.execute(chapters.delete().where(chapters.c.id == chapter_id))

        # Reorder remaining chapters (compact positions)
        await database.execute(
            sa.text("""
                UPDATE chapters
                SET position = position - 1
                WHERE video_id = :video_id AND position > :removed_position
            """),
            {"video_id": video_id, "removed_position": removed_position},
        )

        # Check if any chapte
reorder_chapters function · python · L9260-L9332 (73 LOC)
api/admin.py
async def reorder_chapters(request: Request, video_id: int, data: ReorderChaptersRequest):
    """Reorder chapters in a video."""
    # Validate no duplicate IDs in request (prevents position corruption)
    if len(data.chapter_ids) != len(set(data.chapter_ids)):
        raise HTTPException(
            status_code=400,
            detail="Duplicate chapter IDs provided in reorder request",
        )

    # Verify video exists
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == video_id).where(videos.c.deleted_at.is_(None))
    )
    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    async with database.transaction():
        # Verify all chapter_ids belong to this video with row locking
        existing_chapters = await database.fetch_all(
            sa.text("""
                SELECT id FROM chapters
                WHERE video_id = :video_id
                FOR UPDATE
            """),
            {"video_id": 
get_sprite_queue_status function · python · L9571-L9600 (30 LOC)
api/admin.py
async def get_sprite_queue_status(request: Request) -> SpriteQueueStatusResponse:
    """
    Get the status of the sprite generation queue.

    Returns counts for each job status.
    """
    query = sa.text("""
        SELECT
            status,
            COUNT(*) as count
        FROM sprite_queue
        GROUP BY status
    """)
    rows = await fetch_all_with_retry(query)

    result = SpriteQueueStatusResponse()
    for row in rows:
        status = row["status"]
        count = row["count"]
        if status == "pending":
            result.pending = count
        elif status == "processing":
            result.processing = count
        elif status == "completed":
            result.completed = count
        elif status == "failed":
            result.failed = count
        result.total += count

    return result
Powered by Repobility — scan your code at https://repobility.com
queue_sprite_generation function · python · L9605-L9675 (71 LOC)
api/admin.py
async def queue_sprite_generation(
    request: Request,
    video_id: int,
    data: SpriteGenerationRequest = SpriteGenerationRequest(),
) -> dict:
    """
    Queue a video for sprite sheet generation.

    The sprite generator worker will process the job asynchronously.
    """
    # Verify video exists and is ready
    video = await fetch_one_with_retry(
        videos.select().where(videos.c.id == video_id).where(videos.c.deleted_at.is_(None))
    )

    if not video:
        raise HTTPException(status_code=404, detail="Video not found")

    if video["status"] != "ready":
        raise HTTPException(status_code=400, detail=f"Video must be in 'ready' status (current: {video['status']})")

    # Check if already queued and pending
    existing = await fetch_one_with_retry(
        sa.text("SELECT id FROM sprite_queue WHERE video_id = :video_id AND status = 'pending'").bindparams(
            video_id=video_id
        )
    )

    if existing:
        return {
            "status":
queue_all_for_sprites function · python · L9680-L9736 (57 LOC)
api/admin.py
async def queue_all_for_sprites(
    request: Request,
    priority: str = Query("low", pattern="^(high|normal|low)$"),
) -> dict:
    """
    Queue all ready videos without sprites for sprite generation.

    Uses low priority by default to not interfere with individual requests.
    """
    # Find all ready videos without sprite sheets and not already queued
    query = sa.text("""
        SELECT v.id
        FROM videos v
        LEFT JOIN sprite_queue sq ON v.id = sq.video_id AND sq.status = 'pending'
        WHERE v.status = 'ready'
        AND v.deleted_at IS NULL
        AND (v.sprite_sheet_status IS NULL OR v.sprite_sheet_status = 'failed')
        AND sq.id IS NULL
    """)
    rows = await fetch_all_with_retry(query)

    queued_count = 0
    for row in rows:
        await db_execute_with_retry(
            sprite_queue.insert().values(
                video_id=row["id"],
                priority=priority,
                status="pending",
                created_at=datetime.
list_sprite_jobs function · python · L9741-L9811 (71 LOC)
api/admin.py
async def list_sprite_jobs(
    request: Request,
    status: Optional[str] = Query(None, pattern="^(pending|processing|completed|failed|cancelled)$"),
    limit: int = Query(50, ge=1, le=500),
    offset: int = Query(0, ge=0),
) -> SpriteQueueJobsResponse:
    """
    List sprite generation jobs.

    Supports filtering by status and pagination.
    """
    where_clause = ""
    count_params = {}

    if status:
        where_clause = "WHERE sq.status = :status"
        count_params["status"] = status

    # Get total count
    count_query = sa.text(f"""
        SELECT COUNT(*) as total
        FROM sprite_queue sq
        {where_clause}
    """)
    if count_params:
        count_query = count_query.bindparams(**count_params)
    total = await fetch_val_with_retry(count_query)

    # Get jobs with video info
    query = sa.text(f"""
        SELECT
            sq.id,
            sq.video_id,
            v.slug as video_slug,
            v.title as video_title,
            sq.priority,
‹ prevpage 3 / 20next ›