Function bodies 1,000 total
bulk_restore_videos function · python · L3204-L3304 (101 LOC)api/admin.py
async def bulk_restore_videos(request: Request, data: BulkRestoreRequest) -> BulkRestoreResponse:
"""
Restore multiple soft-deleted videos from archive.
"""
bulk_operation_id = str(uuid.uuid4())
results = []
restored_count = 0
failed_count = 0
client_ip = get_real_ip(request)
user_agent = request.headers.get("user-agent")
for video_id in data.video_ids:
try:
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
results.append(BulkOperationResult(video_id=video_id, success=False, error="Video not found"))
failed_count += 1
continue
if not row["deleted_at"]:
results.append(BulkOperationResult(video_id=video_id, success=False, error="Video is not deleted"))
failed_count += 1
continue
original_deleted_at = row["deleted_at"]
# Update database first
restore_video function · python · L3309-L3389 (81 LOC)api/admin.py
async def restore_video(request: Request, video_id: int):
"""Restore a soft-deleted video from archive."""
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
raise HTTPException(status_code=404, detail="Video not found")
if not row["deleted_at"]:
raise HTTPException(status_code=400, detail="Video is not deleted")
# Store original deleted_at for potential rollback
original_deleted_at = row["deleted_at"]
# Update database FIRST to avoid inconsistent state if file ops fail
await database.execute(videos.update().where(videos.c.id == video_id).values(deleted_at=None))
archive_video_dir = ARCHIVE_DIR / row["slug"]
video_dir = VIDEOS_DIR / row["slug"]
moved_files = [] # Track what we moved for rollback
try:
# Move video files back from archive
if archive_video_dir.exists():
shutil.move(str(archive_video_dir), str(video_dir))
moved_files.append((retry_video function · python · L3394-L3434 (41 LOC)api/admin.py
async def retry_video(request: Request, video_id: int):
"""Retry processing a failed video."""
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
raise HTTPException(status_code=404, detail="Video not found")
if row["status"] != VideoStatus.FAILED:
raise HTTPException(status_code=400, detail="Video is not in failed state")
# Check if source file exists
source_exists = False
for ext in SUPPORTED_VIDEO_EXTENSIONS:
if (UPLOADS_DIR / f"{video_id}{ext}").exists():
source_exists = True
break
if not source_exists:
raise HTTPException(status_code=400, detail="Source file no longer exists")
# Reset status to pending
await database.execute(
videos.update()
.where(videos.c.id == video_id)
.values(
status=VideoStatus.PENDING,
error_message=None,
)
)
# Audit log
log_audit(
AuditAction.re_upload_video function · python · L3439-L3567 (129 LOC)api/admin.py
async def re_upload_video(
request: Request,
video_id: int,
file: UploadFile = File(...),
):
"""
Re-upload a video file, replacing the existing transcoded content.
This will:
- Delete all existing transcoded files (HLS segments, playlists, thumbnail)
- Delete video_qualities, transcoding_jobs, quality_progress, transcriptions
- Save the new file and queue for reprocessing
- Preserve: title, description, category, published_at, created_at, slug
"""
# Early rejection based on Content-Length header (if provided)
validate_content_length(request)
# Validate file extension
file_ext = Path(file.filename).suffix.lower() if file.filename else ""
if not file_ext:
file_ext = ".mp4"
if file_ext not in ALLOWED_VIDEO_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"Invalid file type '{file_ext}'. Allowed: {', '.join(sorted(ALLOWED_VIDEO_EXTENSIONS))}",
)
# Get video inget_video_progress function · python · L3572-L3626 (55 LOC)api/admin.py
async def get_video_progress(request: Request, video_id: int) -> TranscodingProgressResponse:
"""Get transcoding progress for a video."""
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# If video is ready or failed, return simple status
if video["status"] in [VideoStatus.READY, VideoStatus.FAILED]:
return TranscodingProgressResponse(
status=video["status"],
progress_percent=100 if video["status"] == VideoStatus.READY else 0,
last_error=sanitize_progress_error(video["error_message"])
if video["status"] == VideoStatus.FAILED
else None,
)
# If pending, return basic pending status
if video["status"] == VideoStatus.PENDING:
return TranscodingProgressResponse(
status=VideoStatus.PENDING,
progress_percent=0,
)
# Get job info fget_video_qualities function · python · L3631-L3665 (35 LOC)api/admin.py
async def get_video_qualities(request: Request, video_id: int) -> VideoQualitiesResponse:
"""Get available and existing qualities for a video."""
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Get existing transcoded qualities
quality_rows = await database.fetch_all(video_qualities.select().where(video_qualities.c.video_id == video_id))
existing = [
VideoQualityInfo(
name=q["quality"],
width=q["width"],
height=q["height"],
bitrate=q["bitrate"],
status="completed",
)
for q in quality_rows
]
# Determine available qualities based on source resolution
source_height = video["source_height"] or 0
available = ["original"] # Always available
for preset in QUALITY_PRESETS:
if preset["height"] <= source_height:
available.appenretranscode_video function · python · L3670-L3762 (93 LOC)api/admin.py
async def retranscode_video(
request: Request,
video_id: int,
data: RetranscodeRequest,
) -> RetranscodeResponse:
"""
Re-transcode a video, either all qualities or specific ones.
This will queue the video for re-transcoding while keeping it playable
until a worker actually claims and starts processing the job (Issue #408).
The actual cleanup (file deletion, video_qualities deletion, status change)
is deferred to when a worker claims the job via claim_job().
"""
# Get video info
row = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not row:
raise HTTPException(status_code=404, detail="Video not found")
if row["deleted_at"]:
raise HTTPException(status_code=400, detail="Cannot re-transcode a deleted video")
slug = row["slug"]
video_dir = VIDEOS_DIR / slug
# Check if source file exists
source_file = None
for ext in SUPPORTED_VIDEO_EXTENSIONS:
candidate = UPLORepobility · code-quality intelligence platform · https://repobility.com
get_video_transcript function · python · L3770-L3797 (28 LOC)api/admin.py
async def get_video_transcript(request: Request, video_id: int) -> TranscriptionResponse:
"""Get transcription status and text for a video."""
# Get video
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Get transcription record
transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))
if not transcription:
return TranscriptionResponse(status=TranscriptionStatus.NONE)
vtt_url = None
if transcription["status"] == TranscriptionStatus.COMPLETED and transcription["vtt_path"]:
vtt_url = f"/videos/{video['slug']}/captions.vtt"
return TranscriptionResponse(
status=transcription["status"],
language=transcription["language"],
text=transcription["transcript_text"],
vtt_url=vtt_url,
word_count=transcription["word_count"],
trigger_transcription function · python · L3802-L3869 (68 LOC)api/admin.py
async def trigger_transcription(request: Request, video_id: int, data: TranscriptionTrigger = None):
"""Manually trigger transcription for a video."""
# Get video
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
if video["status"] != VideoStatus.READY:
raise HTTPException(status_code=400, detail="Video must be ready before transcription")
# Check if transcription already exists
existing = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))
if existing:
if existing["status"] == TranscriptionStatus.PROCESSING:
raise HTTPException(status_code=400, detail="Transcription already in progress")
# Reset to pending for re-transcription
await database.execute(
transcriptions.update()
.where(transcriptions.c.video_id == video_id)
update_transcript function · python · L3874-L3909 (36 LOC)api/admin.py
async def update_transcript(request: Request, video_id: int, data: TranscriptionUpdate):
"""Manually edit/correct transcript text and regenerate VTT."""
# Get video
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Get transcription
transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this video")
# Update transcript text
word_count = len(data.text.split())
await database.execute(
transcriptions.update()
.where(transcriptions.c.video_id == video_id)
.values(
transcript_text=data.text,
word_count=word_count,
)
)
# Audit log
log_audit(
AuditAction.TRANSCRIPTION_UPDATE,
client_ip=get_real_idelete_transcript function · python · L3914-L3945 (32 LOC)api/admin.py
async def delete_transcript(request: Request, video_id: int):
"""Delete transcription and VTT file for a video."""
# Get video
video = await database.fetch_one(videos.select().where(videos.c.id == video_id))
if not video:
raise HTTPException(status_code=404, detail="Video not found")
# Get transcription
transcription = await database.fetch_one(transcriptions.select().where(transcriptions.c.video_id == video_id))
if not transcription:
raise HTTPException(status_code=404, detail="No transcription found for this video")
# Delete VTT file if exists
vtt_path = VIDEOS_DIR / video["slug"] / "captions.vtt"
if vtt_path.exists():
vtt_path.unlink()
# Delete transcription record
await database.execute(transcriptions.delete().where(transcriptions.c.video_id == video_id))
# Audit log
log_audit(
AuditAction.TRANSCRIPTION_DELETE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("uanalytics_overview function · python · L3953-L4059 (107 LOC)api/admin.py
async def analytics_overview(request: Request, response: Response) -> AnalyticsOverview:
"""Get global analytics overview."""
# Try to get from cache first
cache_key = "analytics_overview"
cached_data = analytics_cache.get(cache_key)
# Get cache max-age from settings (with env var fallback)
cache_max_age = await get_analytics_client_cache_max_age()
if cached_data is not None:
# Set Cache-Control header for client-side caching
response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
return AnalyticsOverview(**cached_data)
# Cache miss - compute fresh data
now = datetime.now(timezone.utc)
today_start = now.replace(hour=0, minute=0, second=0, microsecond=0)
week_start = today_start - timedelta(days=7)
month_start = today_start - timedelta(days=30)
# Total views
total_views = await fetch_val_with_retry(sa.select(sa.func.count()).select_from(playback_sessions)) or 0
# Unique viewers
uanalytics_videos function · python · L4064-L4171 (108 LOC)api/admin.py
async def analytics_videos(
request: Request,
response: Response,
limit: int = Query(default=50, ge=1, le=100, description="Max items per page"),
offset: int = Query(default=0, ge=0, description="Number of items to skip"),
sort_by: str = "views",
period: str = "all",
) -> VideoAnalyticsListResponse:
"""Get per-video analytics."""
# Get cache max-age from settings (with env var fallback)
cache_max_age = await get_analytics_client_cache_max_age()
# Try to get from cache first
cache_key = f"analytics_videos:{limit}:{offset}:{sort_by}:{period}"
cached_data = analytics_cache.get(cache_key)
if cached_data is not None:
# Set Cache-Control header for client-side caching
response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
# Reconstruct response models from cached data
cached_videos = [VideoAnalyticsSummary(**v) for v in cached_data["videos"]]
return VideoAnalyticsListResponse(videosanalytics_video_detail function · python · L4176-L4283 (108 LOC)api/admin.py
async def analytics_video_detail(request: Request, response: Response, video_id: int) -> VideoAnalyticsDetail:
"""Get detailed analytics for a specific video."""
# Get cache max age from settings service
cache_max_age = await get_analytics_client_cache_max_age()
# Try to get from cache first
cache_key = f"analytics_video_detail:{video_id}"
cached_data = analytics_cache.get(cache_key)
if cached_data is not None:
# Set Cache-Control header for client-side caching
response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
# Reconstruct response models from cached data
quality_breakdown = [QualityBreakdown(**q) for q in cached_data["quality_breakdown"]]
views_over_time = [DailyViews(**v) for v in cached_data["views_over_time"]]
return VideoAnalyticsDetail(
video_id=cached_data["video_id"],
title=cached_data["title"],
duration=cached_data["duration"],
toanalytics_trends function · python · L4288-L4357 (70 LOC)api/admin.py
async def analytics_trends(
request: Request,
response: Response,
period: str = "30d",
video_id: Optional[int] = None,
) -> TrendsResponse:
"""Get time-series analytics data."""
# Get cache max age from settings service
cache_max_age = await get_analytics_client_cache_max_age()
# Try to get from cache first
cache_key = f"analytics_trends:{period}:{video_id or 'all'}"
cached_data = analytics_cache.get(cache_key)
if cached_data is not None:
# Set Cache-Control header for client-side caching
response.headers["Cache-Control"] = f"private, max-age={cache_max_age}"
# Reconstruct response models from cached data
data = [TrendDataPoint(**d) for d in cached_data["data"]]
return TrendsResponse(period=cached_data["period"], data=data)
# Cache miss - compute fresh data
# Validate period to prevent SQL injection (whitelist approach)
valid_periods = {"7d": 7, "30d": 30, "90d": 90}
days = valid_periMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
export_videos function · python · L4362-L4465 (104 LOC)api/admin.py
async def export_videos(
request: Request,
status: Optional[str] = Query(None, description="Filter by status (pending, processing, ready, failed)"),
category_id: Optional[int] = Query(None, description="Filter by category ID"),
include_deleted: bool = Query(False, description="Include soft-deleted videos"),
limit: int = Query(10000, ge=1, le=10000, description="Maximum number of videos to export"),
offset: int = Query(0, ge=0, description="Offset for pagination"),
) -> VideoExportResponse:
"""
Export video metadata as JSON.
Supports filtering by status, category, and deleted state.
For CSV export, use the JSON response and convert client-side.
"""
# Validate status if provided
valid_statuses = [s.value for s in VideoStatus]
if status and status not in valid_statuses:
raise HTTPException(
status_code=400,
detail=f"Invalid status '{status}'. Valid options: {', '.join(valid_statuses)}",
)
parse_worker_capabilities function · python · L4471-L4508 (38 LOC)api/admin.py
def parse_worker_capabilities(metadata_json: Optional[str]) -> dict:
"""
Parse worker metadata JSON and return a dict with hardware info and version tracking.
The metadata can contain:
- capabilities: GPU/hardware info including code_version
- deployment_type: How the worker is deployed (kubernetes, systemd, docker, manual)
"""
result = {
"hwaccel_enabled": False,
"hwaccel_type": None,
"gpu_name": None,
"code_version": None,
"deployment_type": None,
}
if not metadata_json:
return result
try:
data = json.loads(metadata_json)
# Extract deployment_type from top level
result["deployment_type"] = data.get("deployment_type")
# Handle nested structure {"capabilities": {...}} or flat structure
caps = data
if "capabilities" in data and isinstance(data["capabilities"], dict):
caps = data["capabilities"]
result["hwaccel_enabled"] = caps.determine_worker_status function · python · L4511-L4532 (22 LOC)api/admin.py
def determine_worker_status(
db_status: str, last_heartbeat: Optional[datetime], current_job_id: Optional[int], offline_threshold: datetime
) -> str:
"""Determine the effective worker status based on heartbeat and current job."""
if db_status == "disabled":
return "disabled"
if not last_heartbeat:
return "offline"
# Ensure timezone-aware comparison
hb = last_heartbeat
if hb.tzinfo is None:
hb = hb.replace(tzinfo=timezone.utc)
if hb < offline_threshold:
return "offline"
# Active = has a job, Idle = no job but online
if current_job_id:
return "active"
return "idle"list_workers_dashboard function · python · L4537-L4662 (126 LOC)api/admin.py
async def list_workers_dashboard(request: Request) -> WorkerDashboardResponse:
"""
List all workers with their status and current activity.
Shows real-time worker status with heartbeat information,
current job assignments, and hardware capabilities.
"""
now = datetime.now(timezone.utc)
offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)
# Get all workers
worker_rows = await fetch_all_with_retry(workers.select().order_by(workers.c.last_heartbeat.desc()))
# Batch fetch current job info for all workers with active jobs
current_job_ids = [row["current_job_id"] for row in worker_rows if row["current_job_id"]]
current_jobs_info = {}
if current_job_ids:
job_rows = await database.fetch_all(
sa.select(
transcoding_jobs.c.id,
transcoding_jobs.c.current_step,
transcoding_jobs.c.progress_percent,
videos.c.slug,
videos.clist_active_jobs function · python · L4667-L4760 (94 LOC)api/admin.py
async def list_active_jobs(request: Request) -> ActiveJobsResponse:
"""
List all active transcoding jobs with worker information.
Shows jobs that are pending or being processed, including
which worker is handling each job and progress details.
"""
# Get all jobs for videos that are pending or processing
query = sa.text("""
SELECT
tj.id as job_id,
tj.video_id,
v.slug as video_slug,
v.title as video_title,
v.status as video_status,
tj.worker_id,
tj.current_step,
tj.progress_percent,
tj.started_at,
tj.claimed_at,
tj.attempt_number,
tj.max_attempts,
w.worker_name,
w.metadata as capabilities
FROM transcoding_jobs tj
JOIN videos v ON tj.video_id = v.id
LEFT JOIN workers w ON tj.worker_id = w.worker_id
WHERE v.status IN ('pending', 'processing')
AND get_worker_detail function · python · L4765-L4896 (132 LOC)api/admin.py
async def get_worker_detail(request: Request, worker_id: str) -> WorkerDetailResponse:
"""
Get detailed information about a specific worker.
Includes capabilities, metadata, stats, and recent job history.
"""
# Find worker by UUID
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
now = datetime.now(timezone.utc)
offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)
status = determine_worker_status(
worker["status"], worker["last_heartbeat"], worker["current_job_id"], offline_threshold
)
# Parse capabilities and metadata
capabilities = None
if worker["capabilities"]:
try:
capabilities = json.loads(worker["capabilities"])
except (json.JSONDecodeError, TypeError):
# If capabilities is not valid JSON or malformed, leave as None
disable_worker function · python · L4901-L4952 (52 LOC)api/admin.py
async def disable_worker(request: Request, worker_id: str):
"""
Disable a worker, preventing it from claiming new jobs.
The worker's existing claimed job (if any) is released back
to the pending queue.
"""
# Find worker by UUID
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
if worker["status"] == "disabled":
raise HTTPException(status_code=400, detail="Worker is already disabled")
async with database.transaction():
# Mark worker as disabled
await database.execute(
workers.update().where(workers.c.id == worker["id"]).values(status="disabled", current_job_id=None)
)
# Release any claimed job
if worker["current_job_id"]:
job = await database.fetch_one(
transcoding_jobs.select().where(transcoding_jobs.c.id == worker["current_jobenable_worker function · python · L4957-L4982 (26 LOC)api/admin.py
async def enable_worker(request: Request, worker_id: str):
"""
Re-enable a disabled worker, allowing it to claim jobs again.
"""
# Find worker by UUID
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
if worker["status"] != "disabled":
raise HTTPException(status_code=400, detail="Worker is not disabled")
# Re-enable worker
await db_execute_with_retry(workers.update().where(workers.c.id == worker["id"]).values(status="active"))
# Audit log
log_audit(
AuditAction.WORKER_ENABLE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="worker",
resource_id=worker["id"],
resource_name=worker["worker_name"] or worker["worker_id"][:8],
)
return {"status": "ok", "message": "Worker enabled"}Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
delete_worker function · python · L4987-L5058 (72 LOC)api/admin.py
async def delete_worker(
request: Request,
worker_id: str,
revoke_keys: bool = True,
):
"""
Delete a worker and optionally revoke its API keys.
Args:
worker_id: The worker UUID to delete
revoke_keys: If True (default), revoke all API keys. If False, keep keys active.
This will:
- Release any claimed job back to pending
- Revoke all API keys (if revoke_keys=True)
- Delete the worker record
"""
# Find worker by UUID
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
now = datetime.now(timezone.utc)
async with database.transaction():
# Release any claimed job
if worker["current_job_id"]:
job = await database.fetch_one(
transcoding_jobs.select().where(transcoding_jobs.c.id == worker["current_job_id"])
)
if job anrestart_worker function · python · L5066-L5116 (51 LOC)api/admin.py
async def restart_worker(request: Request, worker_id: str):
"""
Send restart command to a specific worker.
The worker will finish its current job (if any) before restarting.
Requires Redis to be configured for pub/sub.
"""
from api.pubsub import publish_worker_command
from api.redis_client import get_redis
# Verify worker exists
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
# Check if Redis is available
redis = await get_redis()
if not redis:
raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")
# Get current version from capabilities if available
caps = parse_worker_capabilities(worker.get("capabilities"))
current_version = caps.get("code_version") if caps else None
# Publish restart command
success = await publish_wstop_worker function · python · L5121-L5171 (51 LOC)api/admin.py
async def stop_worker(request: Request, worker_id: str):
"""
Send stop command to a specific worker.
The worker will finish its current job (if any) before stopping.
The process manager (systemd, k8s) will NOT restart the worker.
"""
from api.pubsub import publish_worker_command
from api.redis_client import get_redis
# Verify worker exists
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
# Check if Redis is available
redis = await get_redis()
if not redis:
raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")
# Get current version from capabilities if available
caps = parse_worker_capabilities(worker.get("capabilities"))
current_version = caps.get("code_version") if caps else None
# Publish stop command
success = await prestart_all_workers function · python · L5176-L5225 (50 LOC)api/admin.py
async def restart_all_workers(request: Request):
"""
Broadcast restart command to all active workers.
Each worker will finish its current job before restarting.
This is useful for deploying updates across all workers.
"""
from api.pubsub import publish_worker_command
from api.redis_client import get_redis
# Check if Redis is available
redis = await get_redis()
if not redis:
raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")
# Get all online workers for deployment logging
online_workers = await fetch_all_with_retry(workers.select().where(workers.c.status.in_(["active", "idle"])))
# Publish broadcast restart command
success = await publish_worker_command("all", "restart")
if not success:
raise HTTPException(status_code=500, detail="Failed to broadcast restart command")
# Log deployment events for each worker
triggered_by = get_real_ip(request)
update_worker function · python · L5230-L5284 (55 LOC)api/admin.py
async def update_worker(request: Request, worker_id: str):
"""
Send update command to a specific worker.
The worker will:
1. Finish its current job (if any)
2. Pull latest code via git
3. Restart to apply updates
This is only available for git-based deployments.
"""
from api.pubsub import publish_worker_command
from api.redis_client import get_redis
# Verify worker exists
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
# Check if Redis is available
redis = await get_redis()
if not redis:
raise HTTPException(status_code=503, detail="Redis not available - worker commands require Redis pub/sub")
# Get current version from capabilities if available
caps = parse_worker_capabilities(worker.get("capabilities"))
current_version = caps.get("code_version") if caps else None
list_deployment_events function · python · L5292-L5332 (41 LOC)api/admin.py
async def list_deployment_events(
request: Request,
worker_id: Optional[str] = Query(None, description="Filter by worker UUID"),
event_type: Optional[str] = Query(None, description="Filter by event type"),
limit: int = Query(default=50, ge=1, le=200, description="Number of events to return"),
):
"""
List recent deployment events.
Deployment events track worker restarts, updates, and version changes.
"""
from api.database import deployment_events
query = deployment_events.select().order_by(deployment_events.c.created_at.desc()).limit(limit)
if worker_id:
query = query.where(deployment_events.c.worker_id == worker_id)
if event_type:
query = query.where(deployment_events.c.event_type == event_type)
rows = await fetch_all_with_retry(query)
events = []
for row in rows:
events.append(
{
"id": row["id"],
"worker_id": row["worker_id"],
"worker_nam_log_deployment_event function · python · L5335-L5361 (27 LOC)api/admin.py
async def _log_deployment_event(
worker_id: str,
worker_name: Optional[str],
event_type: str,
triggered_by: str = "admin",
old_version: Optional[str] = None,
new_version: Optional[str] = None,
status: str = "pending",
details: Optional[str] = None,
) -> int:
"""Log a deployment event to the database."""
from api.database import deployment_events
result = await database.execute(
deployment_events.insert().values(
worker_id=worker_id,
worker_name=worker_name,
event_type=event_type,
old_version=old_version,
new_version=new_version,
status=status,
triggered_by=triggered_by,
details=details,
created_at=datetime.now(timezone.utc),
)
)
return resultget_worker_logs function · python · L5366-L5428 (63 LOC)api/admin.py
async def get_worker_logs(
request: Request,
worker_id: str,
lines: int = Query(default=100, ge=1, le=1000, description="Number of log lines to fetch"),
):
"""
Fetch recent logs from a worker.
The worker will respond with logs based on its deployment type:
- systemd: journalctl output
- kubernetes: Points to kubectl logs
- docker: Points to docker logs
- manual: Process info and log file locations
Requires the worker to be online and have Redis pub/sub enabled.
"""
from api.pubsub import request_worker_response
from api.redis_client import get_redis
# Verify worker exists
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
# Check if worker is online
if worker["status"] in ("offline", "disabled"):
raise HTTPException(
status_code=400,
detail=f"WorkeSource: Repobility analyzer · https://repobility.com
get_worker_metrics function · python · L5433-L5485 (53 LOC)api/admin.py
async def get_worker_metrics(request: Request, worker_id: str):
"""
Fetch current metrics from a worker.
Returns CPU, memory, disk usage, and GPU metrics (if available).
Requires the worker to be online and have Redis pub/sub enabled.
"""
from api.pubsub import request_worker_response
from api.redis_client import get_redis
# Verify worker exists
worker = await fetch_one_with_retry(workers.select().where(workers.c.worker_id == worker_id))
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
# Check if worker is online
if worker["status"] in ("offline", "disabled"):
raise HTTPException(
status_code=400,
detail=f"Worker is {worker['status']} - cannot fetch metrics from offline workers",
)
# Check if Redis is available
redis = await get_redis()
if not redis:
raise HTTPException(status_code=503, detail="Redis not available - worker metrics requiresse_progress function · python · L5493-L5596 (104 LOC)api/admin.py
async def sse_progress(
request: Request,
video_ids: Optional[str] = Query(None, description="Comma-separated video IDs to monitor"),
):
"""
Server-Sent Events endpoint for real-time transcoding progress.
Subscribes to progress updates for specified videos (or all if none specified).
Falls back to database polling if Redis is unavailable.
SSE Message Format:
event: progress
data: {"video_id": 123, "progress_percent": 45, ...}
event: heartbeat
data: {"timestamp": "..."}
"""
async def event_generator():
# Send retry interval for client reconnection
yield {"event": "retry", "data": str(SSE_RECONNECT_TIMEOUT_MS)}
# Parse and validate video IDs
vid_list = []
if video_ids:
try:
vid_list = [int(v.strip()) for v in video_ids.split(",") if v.strip()]
except ValueError:
logger.warning(f"Invalid video IDs in SSE request: {video_idsse_workers function · python · L5601-L5680 (80 LOC)api/admin.py
async def sse_workers(request: Request):
"""
Server-Sent Events endpoint for real-time worker status updates.
Provides:
- Worker status changes (active/idle/offline)
- Current job assignments
- Job completion/failure notifications
"""
async def event_generator():
# Send retry interval for client reconnection
yield {"event": "retry", "data": str(SSE_RECONNECT_TIMEOUT_MS)}
# Send initial state
try:
initial_state = await _get_workers_state()
yield {"event": "initial", "data": json.dumps(initial_state)}
except Exception as e:
logger.warning(f"Failed to get initial workers state: {e}")
# Check if Redis is available for pub/sub
redis_available = await is_redis_available()
if redis_available:
# Redis-based real-time updates
subscriber = None
try:
subscriber = await subscribe_to_workers()
if_get_progress_from_database function · python · L5683-L5743 (61 LOC)api/admin.py
async def _get_progress_from_database(video_ids: Optional[str]) -> dict:
"""Get progress data from database for SSE fallback polling."""
# Build query for active videos
query = (
sa.select(
videos.c.id,
videos.c.slug,
videos.c.status,
transcoding_jobs.c.id.label("job_id"),
transcoding_jobs.c.current_step,
transcoding_jobs.c.progress_percent,
transcoding_jobs.c.last_error,
)
.select_from(videos.outerjoin(transcoding_jobs, videos.c.id == transcoding_jobs.c.video_id))
.where(videos.c.status.in_(["pending", "processing"]))
.where(videos.c.deleted_at.is_(None))
)
if video_ids:
try:
ids = [int(v.strip()) for v in video_ids.split(",") if v.strip()]
query = query.where(videos.c.id.in_(ids))
except ValueError:
# If video_ids contains invalid values, ignore the filter and return all active videos
_get_workers_state function · python · L5746-L5848 (103 LOC)api/admin.py
async def _get_workers_state() -> dict:
"""Get current workers state from database for SSE."""
now = datetime.now(timezone.utc)
offline_threshold = now - timedelta(minutes=WORKER_OFFLINE_THRESHOLD_MINUTES)
# Get all workers
rows = await database.fetch_all(workers.select().order_by(workers.c.registered_at.desc()))
workers_list = []
stats = {"total": 0, "active": 0, "idle": 0, "offline": 0, "disabled": 0}
for row in rows:
stats["total"] += 1
# Determine effective status
status = row["status"]
if status == "disabled":
stats["disabled"] += 1
elif status in ("active", "idle", "busy"):
last_hb = row["last_heartbeat"]
if last_hb and last_hb.replace(tzinfo=timezone.utc) < offline_threshold:
status = "offline"
stats["offline"] += 1
elif status == "idle":
stats["idle"] += 1
else:
stats["active"] += get_admin_watermark_settings function · python · L5858-L5889 (32 LOC)api/admin.py
async def get_admin_watermark_settings(request: Request):
"""
Get current watermark configuration.
Returns the current watermark settings from database (with env var fallback).
Supports both image and text watermark types.
"""
# Get watermark settings from database with caching
settings = await get_watermark_settings()
watermark_exists = False
if settings["image"]:
watermark_path = NAS_STORAGE / settings["image"]
watermark_exists = watermark_path.exists()
return {
"enabled": settings["enabled"],
"type": settings["type"],
# Image settings
"image": settings["image"],
"image_exists": watermark_exists,
"image_url": "/api/settings/watermark/image" if watermark_exists else None,
"max_width_percent": settings["max_width_percent"],
# Text settings
"text": settings["text"],
"text_size": settings["text_size"],
"text_color": settings["text_color"],
get_admin_watermark_image function · python · L5894-L5918 (25 LOC)api/admin.py
async def get_admin_watermark_image(request: Request):
"""Serve the watermark image for admin preview."""
# Get watermark settings from database with caching
settings = await get_watermark_settings()
if not settings["image"]:
raise HTTPException(status_code=404, detail="No watermark configured")
watermark_path = NAS_STORAGE / settings["image"]
if not watermark_path.exists():
raise HTTPException(status_code=404, detail="Watermark image not found")
# Determine content type from extension
ext = watermark_path.suffix.lower()
content_types = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
".svg": "image/svg+xml",
".gif": "image/gif",
}
content_type = content_types.get(ext, "application/octet-stream")
return FileResponse(watermark_path, media_type=content_type)upload_watermark_image function · python · L5923-L6012 (90 LOC)api/admin.py
async def upload_watermark_image(
request: Request,
file: UploadFile = File(...),
):
"""
Upload a new watermark image.
The image is saved to the configured watermark path (VLOG_WATERMARK_IMAGE).
If no path is configured, saves to 'watermark.png' in NAS_STORAGE.
Accepts: PNG, JPEG, WebP, SVG, GIF (max 10MB)
For best results, use a PNG with transparency.
Note: After uploading, you must set VLOG_WATERMARK_ENABLED=true and
restart the services for the watermark to appear.
"""
# Validate file extension
if not file.filename:
raise HTTPException(status_code=400, detail="No filename provided")
ext = Path(file.filename).suffix.lower()
allowed_extensions = {".png", ".jpg", ".jpeg", ".webp", ".svg", ".gif"}
if ext not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported image format. Allowed: {', '.join(sorted(allowed_extensions))}",
)
# Check file Repobility · code-quality intelligence platform · https://repobility.com
delete_watermark_image function · python · L6017-L6055 (39 LOC)api/admin.py
async def delete_watermark_image(request: Request):
"""
Delete the current watermark image.
Removes the watermark file from storage. You should also set
'watermark.enabled' to false in Settings to disable the watermark overlay.
"""
# Get watermark settings from database with caching
settings = await get_watermark_settings()
if not settings["image"]:
raise HTTPException(status_code=404, detail="No watermark configured")
watermark_path = NAS_STORAGE / settings["image"]
if not watermark_path.exists():
raise HTTPException(status_code=404, detail="Watermark image not found")
try:
watermark_path.unlink()
# Audit log
log_audit(
AuditAction.SETTINGS_CHANGE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="watermark",
resource_id=None,
resource_name=settings["image"],
details={"actvalidate_safe_path function · python · L6069-L6101 (33 LOC)api/admin.py
def validate_safe_path(base: Path, user_path: str) -> Path:
"""
Validate that a path stays within the base directory.
Args:
base: The base directory that the path must be contained within
user_path: The user-provided path component
Returns:
The validated, resolved path
Raises:
HTTPException: If path traversal is detected
"""
if not user_path:
raise HTTPException(status_code=400, detail="Path cannot be empty")
# Check for obvious path traversal attempts
if ".." in user_path:
logger.warning(f"Path traversal attempt blocked: {user_path}")
raise HTTPException(status_code=400, detail="Invalid path")
try:
full_path = (base / user_path).resolve()
base_resolved = base.resolve()
# Verify the resolved path is within the base directory
full_path.relative_to(base_resolved)
return full_path
except (ValueError, OSError) as e:
logger.warning(f"Pathsanitize_svg function · python · L6104-L6151 (48 LOC)api/admin.py
def sanitize_svg(content: bytes) -> bytes:
"""
Sanitize SVG content by removing potentially dangerous elements and attributes.
Removes:
- <script> tags
- Event handler attributes (onclick, onload, onerror, etc.)
- javascript: URLs
- data: URLs (can contain scripts)
- <foreignObject> elements (can embed HTML)
Args:
content: Raw SVG file content
Returns:
Sanitized SVG content
"""
try:
# Decode to string for regex processing
svg_str = content.decode("utf-8")
# Remove script tags and their content
svg_str = re.sub(r"<script[^>]*>.*?</script>", "", svg_str, flags=re.IGNORECASE | re.DOTALL)
svg_str = re.sub(r"<script[^>]*/>", "", svg_str, flags=re.IGNORECASE)
# Remove foreignObject elements (can embed arbitrary HTML)
svg_str = re.sub(r"<foreignObject[^>]*>.*?</foreignObject>", "", svg_str, flags=re.IGNORECASE | re.DOTALL)
svg_str = re.sub(r"<foreignObject[^>]*/sanitize_custom_css function · python · L6154-L6235 (82 LOC)api/admin.py
def sanitize_custom_css(css: str) -> str:
"""
Sanitize custom CSS to only allow CSS variable declarations.
This prevents CSS injection attacks by restricting custom CSS to:
- CSS custom property (variable) declarations
- Only in :root or body selectors
Blocks:
- url() values (prevent data exfiltration)
- @import rules
- expression() (IE-specific)
- javascript: URLs
- Arbitrary selectors (only :root and body allowed)
Args:
css: The custom CSS string to sanitize
Returns:
Sanitized CSS containing only safe variable declarations
"""
if not css or not css.strip():
return ""
# Remove comments (both block and potentially dangerous constructs)
css = re.sub(r"/\*.*?\*/", "", css, flags=re.DOTALL)
# Block dangerous patterns
dangerous_patterns = [
r"@import\b", # @import rules
r"expression\s*\(", # IE expression()
r"javascript\s*:", # javascript: URLs
r"behavalidate_footer_links function · python · L6238-L6303 (66 LOC)api/admin.py
def validate_footer_links(links: Any) -> list:
"""
Validate and sanitize footer links to prevent XSS via javascript: URLs.
Args:
links: The footer links data (should be a list of {label, url} objects)
Returns:
Validated list of footer links
Raises:
HTTPException: If links contain invalid or dangerous URLs
"""
if links is None:
return []
if not isinstance(links, list):
raise HTTPException(status_code=400, detail="Footer links must be a list")
validated = []
dangerous_protocols = ["javascript:", "data:", "vbscript:", "file:"]
for i, link in enumerate(links):
if not isinstance(link, dict):
raise HTTPException(status_code=400, detail=f"Footer link {i} must be an object")
label = link.get("label", "").strip()
url = link.get("url", "").strip()
if not label:
raise HTTPException(status_code=400, detail=f"Footer link {i} must have a label")
get_branding_settings function · python · L6306-L6352 (47 LOC)api/admin.py
async def get_branding_settings() -> Dict[str, Any]:
"""Get branding settings from database with caching and proper locking."""
import time
global _cached_branding_settings, _cached_branding_settings_time
now = time.time()
# Fast path: check cache without lock
if _cached_branding_settings and (now - _cached_branding_settings_time) < _BRANDING_SETTINGS_CACHE_TTL:
return _cached_branding_settings
# Slow path: acquire lock and refresh cache
async with _branding_cache_lock:
# Double-check after acquiring lock
now = time.time()
if _cached_branding_settings and (now - _cached_branding_settings_time) < _BRANDING_SETTINGS_CACHE_TTL:
return _cached_branding_settings
try:
from api.settings_service import get_settings_service
service = get_settings_service()
settings = {
"site_name": await service.get("branding.site_name", "VLog"),
"logo_paget_admin_branding_settings function · python · L6357-L6385 (29 LOC)api/admin.py
async def get_admin_branding_settings(request: Request):
"""
Get current branding configuration.
Returns the current branding settings from database.
"""
settings = await get_branding_settings()
logo_exists = False
if settings["logo_path"]:
logo_file = NAS_STORAGE / settings["logo_path"]
logo_exists = logo_file.exists()
favicon_exists = False
if settings["favicon_path"]:
favicon_file = NAS_STORAGE / settings["favicon_path"]
favicon_exists = favicon_file.exists()
return {
"site_name": settings["site_name"],
"logo_path": settings["logo_path"],
"logo_exists": logo_exists,
"logo_url": "/api/v1/settings/branding/logo" if logo_exists else None,
"favicon_path": settings["favicon_path"],
"favicon_exists": favicon_exists,
"favicon_url": "/api/v1/settings/branding/favicon" if favicon_exists else None,
"footer_text": settings["footer_text"],
"footer_lget_admin_logo_image function · python · L6390-L6420 (31 LOC)api/admin.py
async def get_admin_logo_image(request: Request):
"""Serve the logo image for admin preview."""
settings = await get_branding_settings()
if not settings["logo_path"]:
raise HTTPException(status_code=404, detail="No logo configured")
# Validate path is within storage directory (prevent path traversal)
logo_path = validate_safe_path(NAS_STORAGE, settings["logo_path"])
if not logo_path.exists():
raise HTTPException(status_code=404, detail="Logo image not found")
ext = logo_path.suffix.lower()
content_types = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
".svg": "image/svg+xml",
".gif": "image/gif",
}
content_type = content_types.get(ext, "application/octet-stream")
return FileResponse(
logo_path,
media_type=content_type,
headers={
"X-Content-Type-Options": "nosniff",
"Cache-Control": "prMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
upload_logo_image function · python · L6425-L6543 (119 LOC)api/admin.py
async def upload_logo_image(
request: Request,
file: UploadFile = File(...),
):
"""
Upload a new logo image.
Accepts: PNG, JPEG, WebP, SVG, GIF (max 10MB)
For best results, use a PNG or SVG with transparency.
SVG files are automatically sanitized to remove scripts.
"""
if not file.filename:
raise HTTPException(status_code=400, detail="No filename provided")
ext = Path(file.filename).suffix.lower()
allowed_extensions = {".png", ".jpg", ".jpeg", ".webp", ".svg", ".gif"}
if ext not in allowed_extensions:
raise HTTPException(
status_code=400,
detail=f"Unsupported image format. Allowed: {', '.join(sorted(allowed_extensions))}",
)
content_length = request.headers.get("content-length")
if content_length and int(content_length) > MAX_THUMBNAIL_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=f"File too large. Maximum size: {MAX_THUMBNAIL_UPLOAD_SIZdelete_logo_image function · python · L6548-L6590 (43 LOC)api/admin.py
async def delete_logo_image(request: Request):
"""Delete the current logo image."""
settings = await get_branding_settings()
if not settings["logo_path"]:
raise HTTPException(status_code=404, detail="No logo configured")
# Validate path to prevent traversal attacks
logo_path = validate_safe_path(NAS_STORAGE, settings["logo_path"])
if not logo_path.exists():
raise HTTPException(status_code=404, detail="Logo image not found")
try:
logo_path.unlink()
log_audit(
AuditAction.SETTINGS_CHANGE,
client_ip=get_real_ip(request),
user_agent=request.headers.get("user-agent"),
resource_type="branding",
resource_id=None,
resource_name=settings["logo_path"],
details={"action": "logo_delete"},
)
# Clear the setting
from api.settings_service import get_settings_service
service = get_settings_service()
await service.sget_admin_favicon function · python · L6595-L6622 (28 LOC)api/admin.py
async def get_admin_favicon(request: Request):
"""Serve the favicon for admin preview."""
settings = await get_branding_settings()
if not settings["favicon_path"]:
raise HTTPException(status_code=404, detail="No favicon configured")
# Validate path to prevent traversal attacks
favicon_path = validate_safe_path(NAS_STORAGE, settings["favicon_path"])
if not favicon_path.exists():
raise HTTPException(status_code=404, detail="Favicon not found")
ext = favicon_path.suffix.lower()
content_types = {
".ico": "image/x-icon",
".png": "image/png",
".svg": "image/svg+xml",
}
content_type = content_types.get(ext, "application/octet-stream")
return FileResponse(
favicon_path,
media_type=content_type,
headers={
"X-Content-Type-Options": "nosniff",
"Cache-Control": "private, max-age=3600",
},
)