Function bodies 1,000 total
get_analytics_client_cache_max_age function · python · L322-L336 (15 LOC)api/admin.py
async def get_analytics_client_cache_max_age() -> int:
"""Get analytics client cache max-age from settings service with fallback to env var.
Uses the main SettingsService which caches all settings for 60 seconds.
This avoids a database round-trip on every analytics request while still
allowing runtime configuration changes.
Returns:
Cache max-age in seconds for analytics API responses.
"""
try:
service = get_settings_service()
return await service.get("analytics.client_cache_max_age", ANALYTICS_CLIENT_CACHE_MAX_AGE)
except Exception:
return ANALYTICS_CLIENT_CACHE_MAX_AGE_check_users_exist function · python · L363-L394 (32 LOC)api/admin.py
async def _check_users_exist() -> bool:
"""
Check if any users exist in the database.
Security: Uses permanent positive caching - once users exist,
the result is cached forever for this process. This prevents
race condition attacks during the setup->authenticated transition.
On database errors, returns True (fail-closed: require authentication).
This prevents attackers from accessing setup endpoint during DB outages.
"""
global _users_exist_cache
# Once users exist, cache permanently (never revert)
if _users_exist_cache is True:
return True
try:
# Use EXISTS for efficiency - stops at first row found
exists = await database.fetch_val("SELECT EXISTS(SELECT 1 FROM users LIMIT 1)")
if exists:
_users_exist_cache = True # Permanent positive cache
return exists
except Exception as e:
# Fail closed: on DB error, require authentication (assume users exist)
# This preventsgenerate_csrf_token function · python · L397-L407 (11 LOC)api/admin.py
def generate_csrf_token(session_token: str) -> str:
"""
Generate a CSRF token from a session token using HMAC.
The CSRF token is cryptographically derived from the session token,
ensuring it's tied to the session and can be validated without database storage.
The HMAC key is derived from ADMIN_API_SECRET for additional security.
"""
if not session_token:
return ""
return hmac.new(CSRF_HMAC_KEY, session_token.encode(), "sha256").hexdigest()validate_csrf_token function · python · L410-L419 (10 LOC)api/admin.py
def validate_csrf_token(session_token: str, csrf_token: str) -> bool:
"""
Validate a CSRF token against the expected token for a session.
Uses constant-time comparison to prevent timing attacks.
"""
if not session_token or not csrf_token:
return False
expected_token = generate_csrf_token(session_token)
return hmac.compare_digest(csrf_token, expected_token)validate_session_token function · python · L422-L447 (26 LOC)api/admin.py
async def validate_session_token(session_token: str) -> bool:
"""
Validate a session token against the database.
Returns True if valid, False if invalid or expired.
Updates last_used_at timestamp for valid sessions.
"""
if not session_token:
return False
now = datetime.now(timezone.utc)
query = admin_sessions.select().where(
admin_sessions.c.session_token == session_token,
admin_sessions.c.expires_at > now,
)
session = await database.fetch_one(query)
if not session:
return False
# Update last_used_at
update_query = (
admin_sessions.update().where(admin_sessions.c.session_token == session_token).values(last_used_at=now)
)
await database.execute(update_query)
return Truecreate_admin_session function · python · L450-L476 (27 LOC)api/admin.py
async def create_admin_session(
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> str:
"""
Create a new admin session and return the session token.
"""
session_token = secrets.token_urlsafe(48) # 64 chars base64
now = datetime.now(timezone.utc)
expires_at = now + timedelta(hours=ADMIN_SESSION_EXPIRY_HOURS)
query = admin_sessions.insert().values(
session_token=session_token,
created_at=now,
expires_at=expires_at,
last_used_at=now,
ip_address=ip_address[:45] if ip_address else None,
user_agent=user_agent[:512] if user_agent else None,
)
await database.execute(query)
security_logger.info(
"Admin session created",
extra={"event": "session_created", "client_ip": ip_address, "expires_at": expires_at.isoformat()},
)
return session_tokendelete_admin_session function · python · L479-L484 (6 LOC)api/admin.py
async def delete_admin_session(session_token: str) -> None:
"""
Delete an admin session if it exists.
"""
query = admin_sessions.delete().where(admin_sessions.c.session_token == session_token)
await database.execute(query)Same scanner, your repo: https://repobility.com — Repobility
cleanup_expired_sessions function · python · L487-L560 (74 LOC)api/admin.py
async def cleanup_expired_sessions() -> int:
"""
Delete expired sessions and related auth tokens. Returns the total number of records deleted.
Called periodically to clean up stale authentication data.
Cleans up:
- Legacy admin_sessions (expired)
- User sessions (refresh token expired or revoked > 7 days ago)
- OIDC states (expired, unused)
- Password reset tokens (expired or used > 7 days ago)
"""
now = datetime.now(timezone.utc)
total_deleted = 0
cutoff = now - timedelta(days=7)
# Clean up legacy admin sessions
query = admin_sessions.delete().where(admin_sessions.c.expires_at < now)
result = await database.execute(query)
if result:
total_deleted += result
# Clean up user sessions (Issue #200)
# Delete sessions where:
# 1. Refresh token has expired (user can't renew session)
# 2. Session was revoked more than 7 days ago (audit trail retention)
try:
user_session_query = user_sessions.AdminAuthMiddleware._parse_cookies method · python · L595-L605 (11 LOC)api/admin.py
def _parse_cookies(self, cookie_header: bytes) -> dict:
"""Parse Cookie header into a dict."""
cookies = {}
if cookie_header:
cookie_str = cookie_header.decode("utf-8", errors="ignore")
for item in cookie_str.split(";"):
item = item.strip()
if "=" in item:
key, value = item.split("=", 1)
cookies[key.strip()] = value.strip()
return cookiesAdminAuthMiddleware.__call__ method · python · L611-L772 (162 LOC)api/admin.py
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
await self.app(scope, receive, send)
return
path = scope.get("path", "")
method = scope.get("method", "")
# Skip auth for non-API paths
if not path.startswith("/api"):
await self.app(scope, receive, send)
return
# Skip auth for OPTIONS (CORS preflight) requests
if method == "OPTIONS":
await self.app(scope, receive, send)
return
# Skip auth for specific public auth endpoints only (not all /api/auth/*)
# This prevents bypass of authentication for sensitive endpoints like
# /api/auth/users, /api/auth/invites, /api/auth/api-keys
public_auth_endpoints = {
# Login and session endpoints
"/api/auth/login",
"/api/auth/check",
"/api/auth/csrf-token",
"/api/v1/auth/login",
"/api/v1/authdelete_video_and_job function · python · L775-L800 (26 LOC)api/admin.py
async def delete_video_and_job(video_id: int) -> None:
"""
Delete a video and all its related records safely.
IMPORTANT: Always use this instead of videos.delete() directly!
SQLite foreign key CASCADE is unreliable with the async databases library
because foreign_keys pragma is per-connection and connections are pooled.
This explicitly deletes related records to prevent orphaned data.
Deletes: quality_progress, transcoding_jobs, playback_sessions,
transcriptions, video_qualities, video_tags, and the video itself.
"""
# Get job_id first (if exists) for quality_progress cleanup
job = await database.fetch_one(transcoding_jobs.select().where(transcoding_jobs.c.video_id == video_id))
if job:
# Delete quality_progress entries first (FK to transcoding_jobs)
await database.execute(quality_progress.delete().where(quality_progress.c.job_id == job["id"]))
# Delete transcoding job
await database.execute(transcoding_jvalidate_content_length function · python · L803-L823 (21 LOC)api/admin.py
def validate_content_length(request: Request) -> None:
"""
Validate Content-Length header against MAX_UPLOAD_SIZE.
This provides early rejection of oversized uploads before the transfer starts,
saving bandwidth for both client and server.
Raises:
HTTPException: 413 if Content-Length exceeds MAX_UPLOAD_SIZE
"""
content_length = request.headers.get("content-length")
if content_length:
try:
if int(content_length) > MAX_UPLOAD_SIZE:
max_size_gb = MAX_UPLOAD_SIZE / (1024 * 1024 * 1024)
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum upload size is {max_size_gb:.0f} GB",
)
except ValueError:
pass # Invalid Content-Length header, continue with streaming validationsave_upload_with_size_limit function · python · L826-L867 (42 LOC)api/admin.py
async def save_upload_with_size_limit(file: UploadFile, upload_path: Path, max_size: int = MAX_UPLOAD_SIZE) -> int:
"""
Stream upload to disk with size validation.
Returns the total bytes written.
Raises HTTPException if file exceeds max_size.
"""
total_size = 0
try:
with open(upload_path, "wb") as f:
while True:
chunk = await file.read(UPLOAD_CHUNK_SIZE)
if not chunk:
break
total_size += len(chunk)
if total_size > max_size:
# Clean up partial file
f.close()
upload_path.unlink(missing_ok=True)
max_size_gb = max_size / (1024 * 1024 * 1024)
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum upload size is {max_size_gb:.0f} GB",
)
f.write(chunk)
except HTcleanup_orphaned_jobs function · python · L870-L900 (31 LOC)api/admin.py
async def cleanup_orphaned_jobs() -> int:
"""
Remove transcoding jobs that reference non-existent videos.
This can happen when CASCADE deletes fail due to SQLite foreign_keys
pragma not being enabled on all connections (a limitation of the
async databases library with connection pooling).
Returns the number of orphaned jobs deleted.
"""
# Find orphaned jobs (video_id doesn't exist in videos table)
orphaned = await database.fetch_all(
sa.text("""
SELECT tj.id, tj.video_id
FROM transcoding_jobs tj
LEFT JOIN videos v ON tj.video_id = v.id
WHERE v.id IS NULL
""")
)
if not orphaned:
return 0
for job in orphaned:
# Delete quality_progress first
await database.execute(quality_progress.delete().where(quality_progress.c.job_id == job["id"]))
# Delete the orphaned job
await database.execute(transcoding_jobs.delete().where(transcoding_jobs.c.ibuild_retranscode_metadata function · python · L903-L929 (27 LOC)api/admin.py
def build_retranscode_metadata(
video_output_dir: Path,
qualities_to_delete: List[str],
retranscode_all: bool,
) -> str:
"""
Build JSON metadata for deferred retranscode cleanup (Issue #408).
This metadata is stored in the transcoding_jobs.retranscode_metadata field
and is read by claim_job() to perform cleanup when the worker claims the job.
Args:
video_dir: Path to the video's output directory
qualities_to_delete: List of quality names to delete (e.g., ["1080p", "720p"])
retranscode_all: Whether this is a full retranscode (deletes all files including thumbnail)
Returns:
JSON string with cleanup instructions
"""
return json.dumps(
{
"retranscode_all": retranscode_all,
"qualities_to_delete": qualities_to_delete,
"delete_transcription": retranscode_all,
"video_dir": str(video_output_dir),
}
)All rows scored by the Repobility analyzer (https://repobility.com)
create_or_reset_transcoding_job function · python · L932-L1045 (114 LOC)api/admin.py
async def create_or_reset_transcoding_job(
video_id: int, priority: str = "normal", retranscode_metadata: Optional[str] = None
) -> None:
"""
Create a new transcoding job or reset an existing one for a video.
Uses ON CONFLICT DO UPDATE for PostgreSQL to handle the case where a job
already exists (e.g., due to upload retry, duplicate submission, or
re-transcode of a video with existing job). For SQLite (tests), catches
IntegrityError and updates the existing job.
This prevents HTTP 500 errors from unique constraint violations (issue #270).
If Redis is configured, also publishes the job to the Redis Streams queue
for instant dispatch to workers.
Args:
video_id: The video ID to create/reset a job for
priority: Job priority ("high", "normal", "low")
retranscode_metadata: Optional JSON string with retranscode cleanup info (Issue #408)
Format: {"retranscode_all": bool, "qualities_to_delete": [...], "delete_tr_periodic_session_cleanup function · python · L1052-L1064 (13 LOC)api/admin.py
async def _periodic_session_cleanup():
"""Background task to periodically clean up expired sessions (admin and user)."""
while True:
try:
# Run cleanup every hour
await asyncio.sleep(3600)
deleted = await cleanup_expired_sessions()
if deleted:
logger.info(f"Cleaned up {deleted} expired sessions")
except asyncio.CancelledError:
break
except Exception as e:
logger.warning(f"Error during session cleanup: {e}")lifespan function · python · L1068-L1177 (110 LOC)api/admin.py
async def lifespan(app: FastAPI):
"""Manage application startup and shutdown."""
global _session_cleanup_task
# Check for deprecated environment variables and warn about migration
check_deprecated_env_vars()
# Validate SESSION_SECRET_KEY is set (required for user authentication)
# This is a critical security requirement - without it, session tokens cannot be signed
if not SESSION_SECRET_KEY or len(SESSION_SECRET_KEY) < 32:
raise RuntimeError(
"VLOG_SESSION_SECRET_KEY is required and must be at least 32 characters. "
"Generate with: openssl rand -base64 32"
)
# Warn about in-memory rate limiting limitations (security issue #446)
if RATE_LIMIT_ENABLED and RATE_LIMIT_STORAGE_URL == "memory://":
logger.warning(
"SECURITY: Rate limiting is using in-memory storage. "
"With multiple API instances, attackers can bypass rate limits by distributing "
"requests across instancdatabase_locked_handler function · python · L1196-L1203 (8 LOC)api/admin.py
async def database_locked_handler(request: Request, exc: DatabaseLockedError):
"""Handle database locked errors with a 503 response."""
logger.warning(f"Database locked error: {exc}")
return JSONResponse(
status_code=503,
content={"detail": "Database temporarily unavailable, please retry"},
headers={"Retry-After": "1"},
)health_check function · python · L1257-L1278 (22 LOC)api/admin.py
async def health_check():
"""
Health check endpoint for monitoring and load balancers.
Returns detailed status of database and storage health.
Returns 503 if any critical component is unhealthy.
"""
result = await check_health()
storage_status = get_storage_status()
return JSONResponse(
status_code=result["status_code"],
content={
"status": "healthy" if result["healthy"] else "unhealthy",
"checks": result["checks"],
"storage": {
"healthy": storage_status["healthy"],
"last_check": storage_status["last_check"],
"error": storage_status["last_error"],
},
},
)metrics_endpoint function · python · L1283-L1330 (48 LOC)api/admin.py
async def metrics_endpoint(request: Request):
"""
Prometheus metrics endpoint.
Returns metrics in Prometheus text format for scraping.
Authentication behavior is controlled by settings:
- metrics.enabled: If false, returns 404 (default: true)
- metrics.auth_required: If true, requires X-Admin-Secret header (default: false)
Security note: For production deployments, consider either:
1. Setting metrics.auth_required=true and using X-Admin-Secret header
2. Network-level isolation (only allow Prometheus server to access /metrics)
Rate limited to 60/minute to prevent brute-force attacks on authentication.
Related: Issue #436
"""
# Check if metrics endpoint is enabled
metrics_enabled = await get_db_setting("metrics.enabled", True)
if not metrics_enabled:
raise HTTPException(status_code=404, detail="Metrics endpoint disabled")
# Check if authentication is required
auth_required = await get_db_setting("metrics.auauth_legacy_logout function · python · L1341-L1365 (25 LOC)api/admin.py
async def auth_legacy_logout(request: Request, response: Response):
"""
Log out legacy admin sessions (for backward compatibility).
"""
# Get session token from cookie
session_token = request.cookies.get(ADMIN_SESSION_COOKIE, "")
if session_token:
await delete_admin_session(session_token)
client_ip = get_real_ip(request)
security_logger.info(
"Admin logout",
extra={"event": "logout", "client_ip": client_ip},
)
# Clear the cookie
response.delete_cookie(
key=ADMIN_SESSION_COOKIE,
path="/",
httponly=True,
secure=SECURE_COOKIES,
samesite="lax",
)
return {"authenticated": False, "message": "Logged out"}get_csrf_token function · python · L1372-L1398 (27 LOC)api/admin.py
async def get_csrf_token(request: Request):
"""
Get a CSRF token for the current session.
The CSRF token must be included in all state-changing requests (POST, PUT, DELETE, PATCH)
via the X-CSRF-Token header. This provides defense-in-depth against CSRF attacks.
Returns 401 if not authenticated.
"""
# If auth is not configured, CSRF is not needed
if not ADMIN_API_SECRET:
return {"csrf_token": "", "required": False}
# Check for X-Admin-Secret header (API clients don't need CSRF - they use header auth)
admin_secret = request.headers.get("x-admin-secret", "")
if admin_secret and hmac.compare_digest(admin_secret, ADMIN_API_SECRET):
return {"csrf_token": "", "required": False}
# Check session cookie and generate CSRF token
session_token = request.cookies.get(ADMIN_SESSION_COOKIE, "")
if session_token:
is_valid = await validate_session_token(session_token)
if is_valid:
csrf_token = generateMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
list_categories function · python · L1406-L1427 (22 LOC)api/admin.py
async def list_categories(request: Request) -> List[CategoryResponse]:
"""List all categories."""
query = sa.text("""
SELECT c.*, COUNT(v.id) as video_count
FROM categories c
LEFT JOIN videos v ON v.category_id = c.id AND v.deleted_at IS NULL
GROUP BY c.id
ORDER BY c.name
""")
rows = await fetch_all_with_retry(query)
return [
CategoryResponse(
id=row["id"],
name=row["name"],
slug=row["slug"],
description=row["description"] or "",
created_at=row["created_at"],
video_count=row["video_count"],
)
for row in rows
]create_category function · python · L1432-L1467 (36 LOC)api/admin.py
async def create_category(request: Request, data: CategoryCreate) -> CategoryResponse:
"""Create a new category."""
slug = slugify(data.name)
# Check for duplicate slug
existing = await fetch_one_with_retry(categories.select().where(categories.c.slug == slug))
if existing:
raise HTTPException(status_code=400, detail="Category with this name already exists")
query = categories.insert().values(
name=data.name,
slug=slug,
description=data.description,
created_at=datetime.now(timezone.utc),
)
category_id = await db_execute_with_retry(query)
# Audit log
log_audit(
AuditAction.CATEGORY_CREATE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="category",
resource_id=category_id,
resource_name=slug,
details={"name": data.name},
)
return CategoryResponse(
id=category_id,
name=data.name,
delete_category function · python · L1472-L1496 (25 LOC)api/admin.py
async def delete_category(request: Request, category_id: int):
"""Delete a category."""
# Verify category exists
existing = await fetch_one_with_retry(categories.select().where(categories.c.id == category_id))
if not existing:
raise HTTPException(status_code=404, detail="Category not found")
# Use transaction to ensure atomicity
async with database.transaction():
# Set videos in this category to uncategorized
await database.execute(videos.update().where(videos.c.category_id == category_id).values(category_id=None))
await database.execute(categories.delete().where(categories.c.id == category_id))
# Audit log
log_audit(
AuditAction.CATEGORY_DELETE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="category",
resource_id=category_id,
resource_name=existing["slug"],
details={"name": existing["name"]},
)
return {"statuslist_tags function · python · L1504-L1525 (22 LOC)api/admin.py
async def list_tags(request: Request) -> List[TagResponse]:
"""List all tags with video counts (including non-ready videos for admin)."""
query = sa.text("""
SELECT t.*, COUNT(vt.video_id) as video_count
FROM tags t
LEFT JOIN video_tags vt ON vt.tag_id = t.id
LEFT JOIN videos v ON v.id = vt.video_id AND v.deleted_at IS NULL
GROUP BY t.id
ORDER BY t.name
""")
rows = await fetch_all_with_retry(query)
return [
TagResponse(
id=row["id"],
name=row["name"],
slug=row["slug"],
created_at=row["created_at"],
video_count=row["video_count"],
)
for row in rows
]create_tag function · python · L1530-L1563 (34 LOC)api/admin.py
async def create_tag(request: Request, data: TagCreate) -> TagResponse:
"""Create a new tag."""
slug = slugify(data.name)
# Check for duplicate slug
existing = await fetch_one_with_retry(tags.select().where(tags.c.slug == slug))
if existing:
raise HTTPException(status_code=400, detail="Tag with this name already exists")
query = tags.insert().values(
name=data.name,
slug=slug,
created_at=datetime.now(timezone.utc),
)
tag_id = await db_execute_with_retry(query)
# Audit log
log_audit(
AuditAction.TAG_CREATE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="tag",
resource_id=tag_id,
resource_name=slug,
details={"name": data.name},
)
return TagResponse(
id=tag_id,
name=data.name,
slug=slug,
created_at=datetime.now(timezone.utc),
video_count=0,
)update_tag function · python · L1568-L1610 (43 LOC)api/admin.py
async def update_tag(request: Request, tag_id: int, data: TagUpdate) -> TagResponse:
"""Update a tag name."""
# Verify tag exists
existing = await fetch_one_with_retry(tags.select().where(tags.c.id == tag_id))
if not existing:
raise HTTPException(status_code=404, detail="Tag not found")
new_slug = slugify(data.name)
# Check for duplicate slug (exclude current tag)
duplicate = await fetch_one_with_retry(tags.select().where(tags.c.slug == new_slug).where(tags.c.id != tag_id))
if duplicate:
raise HTTPException(status_code=400, detail="Tag with this name already exists")
await db_execute_with_retry(tags.update().where(tags.c.id == tag_id).values(name=data.name, slug=new_slug))
# Get video count
count_query = (
sa.select(sa.func.count(sa.distinct(videos.c.id)))
.select_from(video_tags.join(videos, videos.c.id == video_tags.c.video_id))
.where(video_tags.c.tag_id == tag_id)
.where(videos.c.deleteddelete_tag function · python · L1615-L1640 (26 LOC)api/admin.py
async def delete_tag(request: Request, tag_id: int):
"""Delete a tag. Videos with this tag will have it removed."""
# Verify tag exists
existing = await fetch_one_with_retry(tags.select().where(tags.c.id == tag_id))
if not existing:
raise HTTPException(status_code=404, detail="Tag not found")
# Use transaction to ensure atomicity
async with database.transaction():
# Delete video_tags entries first (FK constraint)
await database.execute(video_tags.delete().where(video_tags.c.tag_id == tag_id))
# Delete the tag
await database.execute(tags.delete().where(tags.c.id == tag_id))
# Audit log
log_audit(
AuditAction.TAG_DELETE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="tag",
resource_id=tag_id,
resource_name=existing["slug"],
details={"name": existing["name"]},
)
return {"status": "ok"}list_all_videos function · python · L1648-L1766 (119 LOC)api/admin.py
async def list_all_videos(
request: Request,
status: Optional[str] = None,
limit: int = Query(default=100, ge=1, le=500, description="Max items per page"),
offset: int = Query(default=0, ge=0, description="Number of items to skip (deprecated, use cursor)"),
cursor: Optional[str] = Query(
default=None,
description="Cursor for pagination (more efficient than offset for large datasets). "
"Use next_cursor from previous response.",
),
include_total: bool = Query(
default=False, description="Include total count in response (expensive for large datasets)"
),
) -> PaginatedVideoListResponse:
"""
List all videos (including non-ready ones for admin).
Pagination:
- cursor: Use cursor-based pagination for efficient traversal of large datasets.
Pass the next_cursor from the previous response to get the next page.
- offset: Legacy offset-based pagination (deprecated, use cursor instead).
When cursor is If a scraper extracted this row, it came from Repobility (https://repobility.com)
list_archived_videos function · python · L1771-L1806 (36 LOC)api/admin.py
async def list_archived_videos(
request: Request,
limit: int = Query(default=100, ge=1, le=500, description="Max items per page"),
offset: int = Query(default=0, ge=0, description="Number of items to skip"),
):
"""List all soft-deleted videos in archive.
NOTE: This route must be defined before /api/videos/{video_id}
to prevent "archived" from being matched as a video_id.
"""
query = (
videos.select()
.where(videos.c.deleted_at.is_not(None))
.order_by(videos.c.deleted_at.desc())
.limit(limit)
.offset(offset)
)
rows = await fetch_all_with_retry(query)
# Get total count of archived videos
count_query = sa.select(sa.func.count()).select_from(videos).where(videos.c.deleted_at.is_not(None))
total = await fetch_val_with_retry(count_query)
return {
"videos": [
{
"id": row["id"],
"title": row["title"],
"slug": row["slug"],
get_video function · python · L1811-L1871 (61 LOC)api/admin.py
async def get_video(request: Request, video_id: int) -> VideoResponse:
"""Get video details."""
query = (
sa.select(
videos,
categories.c.name.label("category_name"),
)
.select_from(videos.outerjoin(categories, videos.c.category_id == categories.c.id))
.where(videos.c.id == video_id)
)
row = await fetch_one_with_retry(query)
if not row:
raise HTTPException(status_code=404, detail="Video not found")
quality_rows = await fetch_all_with_retry(video_qualities.select().where(video_qualities.c.video_id == video_id))
qualities = [
VideoQualityResponse(
quality=q["quality"],
width=q["width"],
height=q["height"],
bitrate=q["bitrate"],
)
for q in quality_rows
]
# Get CDN URL prefix for video streaming content (Issue #222)
video_url_prefix = await get_video_url_prefix()
return VideoResponse(
id=row["id"],
upload_video function · python · L1876-L2058 (183 LOC)api/admin.py
async def upload_video(
request: Request,
file: UploadFile = File(...),
title: str = Form(...),
description: str = Form(""),
category_id: Optional[int] = Form(None),
):
"""Upload a new video for processing."""
# Early rejection based on Content-Length header (if provided)
validate_content_length(request)
# Check storage availability before accepting upload
if not await check_storage_available():
raise HTTPException(
status_code=503,
detail="Video storage temporarily unavailable. Please try again later.",
headers={"Retry-After": "30"},
)
# Validate file extension
file_ext = Path(file.filename).suffix.lower() if file.filename else ""
if not file_ext:
file_ext = ".mp4" # Default extension
if file_ext not in ALLOWED_VIDEO_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Invalid file type '{file_ext}'. Allowed: {', '.join(sorted(ALLupdate_video function · python · L2063-L2159 (97 LOC)api/admin.py
async def update_video(
request: Request,
video_id: int,
title: Optional[str] = Form(None),
description: Optional[str] = Form(None),
category_id: Optional[int] = Form(None),
published_at: Optional[str] = Form(None),
is_featured: Optional[bool] = Form(None), # Issue #413 Phase 3
comments_enabled: Optional[str] = Form(None), # Issue #213: "true", "false", or "inherit"
ratings_enabled: Optional[str] = Form(None), # Issue #213: "true", "false", or "inherit"
):
"""Update video metadata."""
update_data = {}
if title is not None:
if len(title.strip()) == 0:
raise HTTPException(status_code=400, detail="Title is required")
if len(title) > MAX_TITLE_LENGTH:
raise HTTPException(status_code=400, detail=f"Title must be {MAX_TITLE_LENGTH} characters or less")
update_data["title"] = title
if description is not None:
if len(description) > MAX_DESCRIPTION_LENGTH:
raise HTTPExceptiopublish_video function · python · L2164-L2190 (27 LOC)api/admin.py
async def publish_video(request: Request, video_id: int):
"""Publish a video (make it visible on the public site)."""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
if video["deleted_at"] is not None:
raise HTTPException(status_code=400, detail="Cannot publish a deleted video")
# Idempotent: skip if already published
if video["published_at"] is not None:
return {"status": "ok", "published": True}
await db_execute_with_retry(
videos.update().where(videos.c.id == video_id).values(published_at=datetime.now(timezone.utc))
)
log_audit(
AuditAction.VIDEO_UPDATE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="video",
resource_id=video_id,
details={"action": "publish"},
)
return {"status": "ok", "published": Trunpublish_video function · python · L2195-L2219 (25 LOC)api/admin.py
async def unpublish_video(request: Request, video_id: int):
"""Unpublish a video (hide it from the public site)."""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
if video["deleted_at"] is not None:
raise HTTPException(status_code=400, detail="Cannot unpublish a deleted video")
# Idempotent: skip if already unpublished
if video["published_at"] is None:
return {"status": "ok", "published": False}
await db_execute_with_retry(videos.update().where(videos.c.id == video_id).values(published_at=None))
log_audit(
AuditAction.VIDEO_UPDATE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="video",
resource_id=video_id,
details={"action": "unpublish"},
)
return {"status": "ok", "published": False}get_video_tags function · python · L2224-L2239 (16 LOC)api/admin.py
async def get_video_tags(request: Request, video_id: int) -> List[VideoTagInfo]:
"""Get all tags for a video."""
# Verify video exists
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
query = (
sa.select(tags.c.id, tags.c.name, tags.c.slug)
.select_from(video_tags.join(tags, video_tags.c.tag_id == tags.c.id))
.where(video_tags.c.video_id == video_id)
.order_by(tags.c.name)
)
rows = await fetch_all_with_retry(query)
return [VideoTagInfo(id=row["id"], name=row["name"], slug=row["slug"]) for row in rows]set_video_tags function · python · L2244-L2290 (47 LOC)api/admin.py
async def set_video_tags(request: Request, video_id: int, data: VideoTagsUpdate) -> List[VideoTagInfo]:
"""Set tags for a video (replaces all existing tags)."""
# Verify video exists
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Verify all tag_ids exist
if data.tag_ids:
existing_tags = await fetch_all_with_retry(tags.select().where(tags.c.id.in_(data.tag_ids)))
existing_ids = {t["id"] for t in existing_tags}
missing_ids = set(data.tag_ids) - existing_ids
if missing_ids:
raise HTTPException(status_code=400, detail=f"Tag IDs not found: {sorted(missing_ids)}")
# Replace all tags in a transaction
async with database.transaction():
# Remove existing tags
await database.execute(video_tags.delete().where(video_tags.c.video_id == video_id))
# Add new tags
if data.Same scanner, your repo: https://repobility.com — Repobility
remove_video_tag function · python · L2295-L2323 (29 LOC)api/admin.py
async def remove_video_tag(request: Request, video_id: int, tag_id: int):
"""Remove a single tag from a video."""
# Verify video exists
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Verify tag exists
tag = await fetch_one_with_retry(tags.select().where(tags.c.id == tag_id))
if not tag:
raise HTTPException(status_code=404, detail="Tag not found")
# Remove the tag association
await db_execute_with_retry(
video_tags.delete().where(sa.and_(video_tags.c.video_id == video_id, video_tags.c.tag_id == tag_id))
)
# Audit log
log_audit(
AuditAction.VIDEO_TAGS_UPDATE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="video",
resource_id=video_id,
resource_name=video["slug"],
details={"removed_tag_id": tag_id, "re_get_video_source_path function · python · L2329-L2359 (31 LOC)api/admin.py
def _get_video_source_path(video_id: int, slug: str) -> Optional[Path]:
"""
Find the source video file for thumbnail generation.
Checks in order:
1. Original upload in UPLOADS_DIR
2. Highest quality HLS variant in VIDEOS_DIR
Returns None if no source is available.
"""
# First check uploads directory for original file
for ext in SUPPORTED_VIDEO_EXTENSIONS:
upload_path = UPLOADS_DIR / f"{video_id}{ext}"
if upload_path.exists():
return upload_path
# Fall back to highest quality HLS variant
video_dir = VIDEOS_DIR / slug
if not video_dir.exists():
return None
# Check for original quality first, then descending quality order
quality_order = ["original", "2160p", "1440p", "1080p", "720p", "480p", "360p"]
for quality in quality_order:
playlist = video_dir / f"{quality}.m3u8"
if playlist.exists():
# Verify segments exist before returning playlist (ffmpeg can read HLS dget_thumbnail_info function · python · L2371-L2388 (18 LOC)api/admin.py
async def get_thumbnail_info(request: Request, video_id: int) -> ThumbnailInfoResponse:
"""Get current thumbnail information for a video."""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
thumbnail_url = None
if video["status"] == VideoStatus.READY:
thumbnail_path = VIDEOS_DIR / video["slug"] / "thumbnail.jpg"
if thumbnail_path.exists():
thumbnail_url = f"/videos/{video['slug']}/thumbnail.jpg"
return ThumbnailInfoResponse(
video_id=video_id,
thumbnail_url=thumbnail_url,
thumbnail_source=video["thumbnail_source"] or "auto",
thumbnail_timestamp=video["thumbnail_timestamp"],
)generate_thumbnail_frames function · python · L2393-L2446 (54 LOC)api/admin.py
async def generate_thumbnail_frames(request: Request, video_id: int) -> ThumbnailFramesResponse:
"""
Generate multiple frame options at different timestamps for thumbnail selection.
Returns URLs to temporary frame images at 10%, 25%, 50%, 75%, 90% of video duration.
Frames are stored in VIDEOS_DIR/{slug}/frames/ directory.
"""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
duration = video["duration"]
if not duration or duration <= 0:
raise HTTPException(status_code=400, detail="Video has no duration information")
# Find source file
source_path = _get_video_source_path(video_id, video["slug"])
if not source_path:
raise HTTPException(
status_code=400,
detail="No source video available for frame extraction. Original upload may have been deleted.",
)
# Create frames dupload_custom_thumbnail function · python · L2451-L2560 (110 LOC)api/admin.py
async def upload_custom_thumbnail(
request: Request,
video_id: int,
file: UploadFile = File(...),
) -> ThumbnailResponse:
"""
Upload a custom thumbnail image.
Accepts: JPEG, PNG, WebP (max 10MB)
Converts to JPEG at 640px width, preserving aspect ratio.
"""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Validate file extension
if not file.filename:
raise HTTPException(status_code=400, detail="No filename provided")
ext = Path(file.filename).suffix.lower()
if ext not in SUPPORTED_IMAGE_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Unsupported image format. Allowed: {', '.join(sorted(SUPPORTED_IMAGE_EXTENSIONS))}",
)
# Check file size via Content-Length header
content_length = request.headers.get("content-length")
if content_length and iselect_thumbnail_frame function · python · L2565-L2632 (68 LOC)api/admin.py
async def select_thumbnail_frame(
request: Request,
video_id: int,
timestamp: float = Form(...),
) -> ThumbnailResponse:
"""
Select a frame at the specified timestamp as the thumbnail.
Can use a timestamp from the generated frames or any custom timestamp.
"""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
duration = video["duration"]
if not duration or duration <= 0:
raise HTTPException(status_code=400, detail="Video has no duration information")
# Validate timestamp
if timestamp < 0 or timestamp > duration:
raise HTTPException(status_code=400, detail=f"Timestamp must be between 0 and {duration:.2f} seconds")
# Find source file
source_path = _get_video_source_path(video_id, video["slug"])
if not source_path:
raise HTTPException(
status_code=400,
detail=revert_thumbnail function · python · L2637-L2697 (61 LOC)api/admin.py
async def revert_thumbnail(request: Request, video_id: int) -> ThumbnailResponse:
"""
Revert to the auto-generated thumbnail (default timestamp).
Regenerates the thumbnail at the default position (5 seconds or 25% of duration).
"""
video = await fetch_one_with_retry(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
duration = video["duration"]
if not duration or duration <= 0:
raise HTTPException(status_code=400, detail="Video has no duration information")
# Find source file
source_path = _get_video_source_path(video_id, video["slug"])
if not source_path:
raise HTTPException(
status_code=400,
detail="No source video available for thumbnail generation. Original upload may have been deleted.",
)
# Calculate default timestamp (same as transcoder)
default_timestamp = min(5.0, duration / 4)
# Generate delete_video function · python · L2702-L2854 (153 LOC)api/admin.py
async def delete_video(
request: Request,
video_id: int,
permanent: bool = False,
):
"""
Soft-delete a video (moves to archive) or permanently delete if permanent=True.
Args:
video_id: The video ID to delete
permanent: If True, permanently delete. If False (default), soft-delete to archive.
Soft-delete (permanent=False):
- Moves video files to archive directory
- Sets deleted_at timestamp
- Video can be restored within retention period
Permanent delete (permanent=True):
- Removes all files permanently
- Deletes all database records
- Cannot be undone
"""
# Get video info
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
raise HTTPException(status_code=404, detail="Video not found")
if permanent:
# PERMANENT DELETE - remove everything
# First, delete all database records atomically
async with database.transaction():
All rows scored by the Repobility analyzer (https://repobility.com)
bulk_delete_videos function · python · L2862-L2991 (130 LOC)api/admin.py
async def bulk_delete_videos(request: Request, data: BulkDeleteRequest) -> BulkDeleteResponse:
"""
Delete multiple videos at once.
Supports both soft-delete (moves to archive) and permanent delete.
Operations are performed individually to track per-video success/failure.
"""
bulk_operation_id = str(uuid.uuid4())
results = []
deleted_count = 0
failed_count = 0
client_ip = get_real_ip(request)
user_agent = request.headers.get("user-agent")
for video_id in data.video_ids:
try:
# Get video info
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
results.append(BulkOperationResult(video_id=video_id, success=False, error="Video not found"))
failed_count += 1
continue
if data.permanent:
# PERMANENT DELETE
async with database.transaction():
job = awaitbulk_update_videos function · python · L2996-L3080 (85 LOC)api/admin.py
async def bulk_update_videos(request: Request, data: BulkUpdateRequest) -> BulkUpdateResponse:
"""
Update multiple videos with the same values.
Supports updating category, published_at, and unpublishing.
"""
bulk_operation_id = str(uuid.uuid4())
results = []
updated_count = 0
failed_count = 0
client_ip = get_real_ip(request)
user_agent = request.headers.get("user-agent")
# Validate category exists if provided
if data.category_id is not None and data.category_id > 0:
existing_category = await database.fetch_one(categories.select().where(categories.c.id == data.category_id))
if not existing_category:
raise HTTPException(status_code=400, detail=f"Category with ID {data.category_id} does not exist")
# Build update values
update_values = {}
if data.category_id is not None:
update_values["category_id"] = data.category_id if data.category_id > 0 else None
if data.unpublish:
update_vabulk_retranscode_videos function · python · L3085-L3199 (115 LOC)api/admin.py
async def bulk_retranscode_videos(request: Request, data: BulkRetranscodeRequest) -> BulkRetranscodeResponse:
"""
Queue multiple videos for re-transcoding.
Videos remain playable until a worker actually claims and starts
processing the job (Issue #408).
"""
bulk_operation_id = str(uuid.uuid4())
results = []
queued_count = 0
failed_count = 0
client_ip = get_real_ip(request)
user_agent = request.headers.get("user-agent")
for video_id in data.video_ids:
try:
# Get video info
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
results.append(BulkOperationResult(video_id=video_id, success=False, error="Video not found"))
failed_count += 1
continue
slug = row["slug"]
source_height = row["source_height"]
video_dir = VIDEOS_DIR / slug
# Check source file exists
page 1 / 20next ›