Function bodies 1,000 total
link_oidc function · python · L573-L676 (104 LOC)api/auth/oidc.py
async def link_oidc(
request: Request,
code: str = Query(..., description="Authorization code from provider"),
state: str = Query(..., description="State parameter"),
user: dict = Depends(require_auth),
) -> dict:
"""
Link OIDC provider to existing account.
User must be logged in first.
"""
if not OIDC_ENABLED:
raise HTTPException(status_code=400, detail="OIDC is not enabled")
# Similar flow to callback, but link to existing user instead of creating/logging in
# This is a simplified version
now = datetime.now(timezone.utc)
# Validate state
stored_state = await database.fetch_one(
oidc_states.select()
.where(oidc_states.c.state == state)
.where(oidc_states.c.expires_at > now)
)
if not stored_state:
raise HTTPException(status_code=400, detail="Invalid or expired state")
await database.execute(
oidc_states.delete().where(oidc_states.c.id == stored_state["id"])
)
unlink_oidc function · python · L680-L714 (35 LOC)api/auth/oidc.py
async def unlink_oidc(
user: dict = Depends(require_auth),
) -> dict:
"""
Unlink OIDC provider from account.
User must have a password set to unlink OIDC.
"""
# Get user with password_hash
full_user = await database.fetch_one(
users.select().where(users.c.id == user["id"])
)
if not full_user["password_hash"]:
raise HTTPException(
status_code=400,
detail="Cannot unlink OIDC without a password. Set a password first.",
)
# Delete connection
result = await database.execute(
oidc_connections.delete().where(
oidc_connections.c.user_id == user["id"]
)
)
if result == 0:
raise HTTPException(status_code=404, detail="No OIDC connection found")
security_logger.info(
"OIDC account unlinked",
extra={"event": "oidc_unlinked", "user_id": user["id"]},
)
return {"message": "OIDC account unlinked"}hash_password function · python · L31-L41 (11 LOC)api/auth/password.py
def hash_password(password: str) -> str:
"""
Hash a password using argon2id.
Args:
password: The plaintext password to hash
Returns:
The argon2id hash string with embedded salt and parameters
"""
return _password_hasher.hash(password)verify_password function · python · L44-L65 (22 LOC)api/auth/password.py
def verify_password(password: str, password_hash: str) -> bool:
"""
Verify a password against a stored hash.
Args:
password: The plaintext password to verify
password_hash: The stored argon2id hash
Returns:
True if the password matches, False otherwise
"""
if not password_hash:
return False
try:
_password_hasher.verify(password_hash, password)
return True
except VerifyMismatchError:
return False
except InvalidHashError:
# Malformed hash in database
return Falseneeds_rehash function · python · L68-L86 (19 LOC)api/auth/password.py
def needs_rehash(password_hash: str) -> bool:
"""
Check if a password hash needs to be rehashed.
This happens when argon2 parameters have been updated.
Args:
password_hash: The stored hash to check
Returns:
True if the hash should be updated with new parameters
"""
if not password_hash:
return False
try:
return _password_hasher.check_needs_rehash(password_hash)
except InvalidHashError:
return Falsevalidate_password_strength function · python · L89-L113 (25 LOC)api/auth/password.py
def validate_password_strength(password: str) -> Tuple[bool, str]:
"""
Validate password meets minimum requirements.
Args:
password: The password to validate
Returns:
Tuple of (is_valid, error_message)
If valid, error_message is empty string
"""
if not password:
return False, "Password is required"
if len(password) < PASSWORD_MIN_LENGTH:
return False, f"Password must be at least {PASSWORD_MIN_LENGTH} characters"
# Basic complexity check - at least one of each: letter and number/symbol
has_letter = any(c.isalpha() for c in password)
has_non_letter = any(not c.isalpha() for c in password)
if not has_letter or not has_non_letter:
return False, "Password must contain both letters and numbers/symbols"
return True, ""generate_token function · python · L116-L126 (11 LOC)api/auth/password.py
def generate_token(length: int = 32) -> str:
"""
Generate a cryptographically secure random token.
Args:
length: Number of random bytes (output will be ~1.3x longer due to base64)
Returns:
URL-safe base64-encoded token
"""
return secrets.token_urlsafe(length)Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
hash_token function · python · L129-L141 (13 LOC)api/auth/password.py
def hash_token(token: str) -> str:
"""
Hash a token using argon2id.
Used for session tokens, refresh tokens, API keys, etc.
Args:
token: The plaintext token to hash
Returns:
The argon2id hash string
"""
return _password_hasher.hash(token)verify_token function · python · L144-L162 (19 LOC)api/auth/password.py
def verify_token(token: str, token_hash: str) -> bool:
"""
Verify a token against a stored hash.
Args:
token: The plaintext token to verify
token_hash: The stored hash
Returns:
True if the token matches, False otherwise
"""
if not token or not token_hash:
return False
try:
_password_hasher.verify(token_hash, token)
return True
except (VerifyMismatchError, InvalidHashError):
return Falseget_token_prefix function · python · L165-L176 (12 LOC)api/auth/password.py
def get_token_prefix(token: str, length: int = 8) -> str:
"""
Get the first N characters of a token for efficient database lookup.
Args:
token: The token to get prefix from
length: Number of characters to extract (default 8)
Returns:
The first N characters of the token
"""
return token[:length] if len(token) >= length else tokenhash_token_fast function · python · L188-L201 (14 LOC)api/auth/password.py
def hash_token_fast(token: str) -> str:
"""
Hash a token using SHA-256 (for session/API tokens only).
This is much faster than argon2id (~0.001ms vs ~50ms) and is appropriate
for high-entropy random tokens that cannot be brute-forced.
Args:
token: The plaintext token to hash
Returns:
The SHA-256 hash as a hex string
"""
return hashlib.sha256(token.encode()).hexdigest()verify_token_fast function · python · L204-L222 (19 LOC)api/auth/password.py
def verify_token_fast(token: str, token_hash: str) -> bool:
"""
Verify a token against a SHA-256 hash using constant-time comparison.
Args:
token: The plaintext token to verify
token_hash: The stored SHA-256 hash
Returns:
True if the token matches, False otherwise
"""
if not token or not token_hash:
return False
try:
expected = hash_token_fast(token)
return hmac.compare_digest(expected, token_hash)
except Exception:
return Falseis_sha256_hash function · python · L225-L240 (16 LOC)api/auth/password.py
def is_sha256_hash(hash_value: str) -> bool:
"""
Check if a hash is a SHA-256 hash (64 hex characters).
Used to determine whether to use fast or slow verification during migration.
Args:
hash_value: The hash to check
Returns:
True if it looks like a SHA-256 hash
"""
if not hash_value:
return False
# SHA-256 produces 64 hex characters, argon2 hashes start with $argon2
return len(hash_value) == 64 and not hash_value.startswith("$")get_role_permissions function · python · L163-L179 (17 LOC)api/auth/permissions.py
def get_role_permissions(role: Role) -> FrozenSet[Permission]:
"""
Get all permissions for a role.
Args:
role: The user's role
Returns:
Frozen set of permissions for the role
"""
if isinstance(role, str):
try:
role = Role(role)
except ValueError:
return frozenset()
return _ROLE_PERMISSIONS.get(role, frozenset())has_permission function · python · L182-L194 (13 LOC)api/auth/permissions.py
def has_permission(role: Role, permission: Permission) -> bool:
"""
Check if a role has a specific permission.
Args:
role: The user's role
permission: The permission to check
Returns:
True if the role has the permission, False otherwise
"""
permissions = get_role_permissions(role)
return permission in permissionsIf a scraper extracted this row, it came from Repobility (https://repobility.com)
check_ownership_permission function · python · L197-L231 (35 LOC)api/auth/permissions.py
def check_ownership_permission(
role: Role,
permission: Permission,
owner_id: str | None,
user_id: str,
) -> bool:
"""
Check if a user has permission, considering ownership.
For "any" permissions (like VIDEO_UPDATE_ANY), returns True if user has that permission.
For regular permissions, returns True only if user owns the resource.
Args:
role: The user's role
permission: The permission to check (e.g., VIDEO_UPDATE)
owner_id: The owner ID of the resource (can be None for unowned resources)
user_id: The current user's ID
Returns:
True if the user has permission for this resource
"""
permissions = get_role_permissions(role)
# Check for "any" permission (admin-level)
any_permission_name = f"{permission.value}:any"
for perm in permissions:
if perm.value == any_permission_name:
return True
# Check regular permission + ownership
if permission not in permissions:
get_role_display_name function · python · L239-L251 (13 LOC)api/auth/permissions.py
def get_role_display_name(role: Role) -> str:
"""Get human-readable name for a role."""
display_names = {
Role.ADMIN: "Administrator",
Role.EDITOR: "Editor",
Role.VIEWER: "Viewer",
}
if isinstance(role, str):
try:
role = Role(role)
except ValueError:
return role
return display_names.get(role, role.value)create_user_session function · python · L59-L135 (77 LOC)api/auth/sessions.py
async def create_user_session(
user_id: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Tuple[str, str, datetime, datetime]:
"""
Create a new session for a user.
Uses a transaction to ensure session limit enforcement and creation
are atomic, preventing race conditions from concurrent logins.
Args:
user_id: The user's ID
ip_address: Client IP address for audit
user_agent: Client user agent for audit
Returns:
Tuple of (session_token, refresh_token, expires_at, refresh_expires_at)
"""
now = datetime.now(timezone.utc)
expires_at = now + timedelta(hours=USER_SESSION_EXPIRY_HOURS)
refresh_expires_at = now + timedelta(days=USER_REFRESH_TOKEN_EXPIRY_DAYS)
# Generate tokens
session_token = generate_token(48) # 64-char token
refresh_token = generate_token(48)
refresh_family_id = str(uuid.uuid4())
# Hash tokens for storage using SHA-256 (fast, tokens are highvalidate_session_token function · python · L138-L231 (94 LOC)api/auth/sessions.py
async def validate_session_token(
session_token: str,
allow_grace_period: bool = False,
) -> Optional[dict]:
"""
Validate a session token and return the user.
Args:
session_token: The session token to validate
allow_grace_period: Whether to allow expired sessions within grace period
Returns:
User record as dict if valid, None otherwise
"""
if not session_token or len(session_token) < 8:
return None
now = datetime.now(timezone.utc)
# Use prefix for efficient indexed lookup
token_prefix = get_token_prefix(session_token)
# Query sessions with matching prefix (typically 1 match)
sessions = await database.fetch_all(
user_sessions.select().where(
user_sessions.c.token_prefix == token_prefix,
user_sessions.c.revoked_at.is_(None),
)
)
for session in sessions:
token_hash = session["token_hash"]
# Support both SHA-256 (new) and argon2id (legacyrefresh_user_session function · python · L234-L394 (161 LOC)api/auth/sessions.py
async def refresh_user_session(
refresh_token: str,
ip_address: Optional[str] = None,
user_agent: Optional[str] = None,
) -> Tuple[str, str, datetime, datetime]:
"""
Refresh a session using a refresh token.
Implements token rotation with family tracking for theft detection.
Args:
refresh_token: The refresh token
ip_address: Client IP address for new session
user_agent: Client user agent for new session
Returns:
Tuple of (new_session_token, new_refresh_token, expires_at, refresh_expires_at)
Raises:
SessionExpiredError: If refresh token is expired
SessionRevokedError: If session was revoked
RefreshTokenReusedError: If token was already rotated (potential theft)
"""
if not refresh_token or len(refresh_token) < 8:
raise SessionError("Invalid refresh token")
now = datetime.now(timezone.utc)
# Use prefix for efficient indexed lookup
refresh_prefix = get_token_prefix(rinvalidate_session function · python · L397-L414 (18 LOC)api/auth/sessions.py
async def invalidate_session(session_id: str) -> bool:
"""
Invalidate a specific session.
Args:
session_id: The session ID to invalidate
Returns:
True if session was found and invalidated
"""
now = datetime.now(timezone.utc)
result = await database.execute(
user_sessions.update()
.where(user_sessions.c.id == session_id)
.where(user_sessions.c.revoked_at.is_(None))
.values(revoked_at=now)
)
return result > 0invalidate_user_sessions function · python · L417-L439 (23 LOC)api/auth/sessions.py
async def invalidate_user_sessions(user_id: str, except_session_id: Optional[str] = None) -> int:
"""
Invalidate all sessions for a user.
Args:
user_id: The user's ID
except_session_id: Optional session ID to keep (for logout elsewhere)
Returns:
Number of sessions invalidated
"""
now = datetime.now(timezone.utc)
query = (
user_sessions.update()
.where(user_sessions.c.user_id == user_id)
.where(user_sessions.c.revoked_at.is_(None))
)
if except_session_id:
query = query.where(user_sessions.c.id != except_session_id)
result = await database.execute(query.values(revoked_at=now))
return resultget_user_sessions function · python · L442-L459 (18 LOC)api/auth/sessions.py
async def get_user_sessions(user_id: str) -> list[dict]:
"""
Get all active sessions for a user.
Args:
user_id: The user's ID
Returns:
List of session records
"""
sessions = await database.fetch_all(
user_sessions.select()
.where(user_sessions.c.user_id == user_id)
.where(user_sessions.c.revoked_at.is_(None))
.where(user_sessions.c.expires_at > datetime.now(timezone.utc))
.order_by(user_sessions.c.created_at.desc())
)
return [dict(s) for s in sessions]Want this analysis on your repo? https://repobility.com/scan/
cleanup_expired_sessions function · python · L462-L488 (27 LOC)api/auth/sessions.py
async def cleanup_expired_sessions() -> int:
"""
Clean up expired sessions.
Returns:
Number of sessions deleted
"""
now = datetime.now(timezone.utc)
# Delete sessions where both session and refresh have expired
# Keep revoked sessions for a short time for audit trail
cutoff = now - timedelta(days=7) # Keep audit trail for 7 days
result = await database.execute(
user_sessions.delete().where(
(user_sessions.c.refresh_expires_at < now)
| (
(user_sessions.c.revoked_at.isnot(None))
& (user_sessions.c.revoked_at < cutoff)
)
)
)
if result > 0:
logger.info(f"Cleaned up {result} expired sessions")
return result_enforce_session_limit function · python · L491-L523 (33 LOC)api/auth/sessions.py
async def _enforce_session_limit(user_id: str) -> None:
"""
Enforce maximum sessions per user by revoking oldest sessions.
Uses row-level locking (FOR UPDATE) to prevent race conditions
when multiple concurrent logins occur for the same user.
"""
now = datetime.now(timezone.utc)
# Use FOR UPDATE to lock rows and prevent concurrent session creation
# from exceeding the limit. The lock is held until transaction commits.
# Note: This query uses raw SQL for FOR UPDATE support
active_sessions = await database.fetch_all(
"""
SELECT id, created_at FROM user_sessions
WHERE user_id = :user_id
AND revoked_at IS NULL
AND expires_at > :now
ORDER BY created_at DESC
FOR UPDATE
""",
{"user_id": user_id, "now": now},
)
if len(active_sessions) >= USER_MAX_SESSIONS:
# Revoke oldest sessions to make room (keep newest USER_MAX_SESSIONS - 1)
sessions_to_revoke = act_revoke_session_family function · python · L526-L539 (14 LOC)api/auth/sessions.py
async def _revoke_session_family(family_id: str) -> int:
"""
Revoke all sessions in a token family.
Used when token theft is detected.
"""
now = datetime.now(timezone.utc)
result = await database.execute(
user_sessions.update()
.where(user_sessions.c.refresh_family_id == family_id)
.where(user_sessions.c.revoked_at.is_(None))
.values(revoked_at=now)
)
return result_validate_role function · python · L83-L90 (8 LOC)api/auth/users.py
def _validate_role(role: str) -> None:
"""Validate role value."""
valid_roles = [r.value for r in Role]
if role not in valid_roles:
raise HTTPException(
status_code=400,
detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}",
)_validate_status function · python · L93-L100 (8 LOC)api/auth/users.py
def _validate_status(status: str) -> None:
"""Validate status value."""
valid_statuses = ["active", "disabled", "pending"]
if status not in valid_statuses:
raise HTTPException(
status_code=400,
detail=f"Invalid status. Must be one of: {', '.join(valid_statuses)}",
)list_users function · python · L109-L172 (64 LOC)api/auth/users.py
async def list_users(
limit: int = Query(default=50, ge=1, le=100),
offset: int = Query(default=0, ge=0),
role: Optional[str] = Query(default=None),
status: Optional[str] = Query(default=None),
search: Optional[str] = Query(default=None),
current_user: dict = Depends(require_permission(Permission.USER_READ)),
) -> UserListResponse:
"""
List all users with optional filtering.
Requires user:read permission (admin only).
"""
query = users.select()
# Apply filters
if role:
_validate_role(role)
query = query.where(users.c.role == role)
if status:
_validate_status(status)
query = query.where(users.c.status == status)
if search:
search_pattern = f"%{search}%"
query = query.where(
(users.c.username.ilike(search_pattern))
| (users.c.email.ilike(search_pattern))
| (users.c.display_name.ilike(search_pattern))
)
# Get total count
from create_user function · python · L176-L252 (77 LOC)api/auth/users.py
async def create_user(
body: CreateUserRequest,
current_user: dict = Depends(require_permission(Permission.USER_CREATE)),
) -> UserResponse:
"""
Create a new user.
Requires user:create permission (admin only).
"""
_validate_role(body.role)
# Check username uniqueness
existing = await database.fetch_one(
users.select().where(users.c.username == body.username.lower())
)
if existing:
raise HTTPException(status_code=400, detail="Username already exists")
# Check email uniqueness
existing = await database.fetch_one(
users.select().where(users.c.email == body.email.lower())
)
if existing:
raise HTTPException(status_code=400, detail="Email already exists")
# Hash password if provided
password_hash = None
if body.password:
is_valid, error = validate_password_strength(body.password)
if not is_valid:
raise HTTPException(status_code=400, detail=error)
pasget_user function · python · L256-L282 (27 LOC)api/auth/users.py
async def get_user(
user_id: str,
current_user: dict = Depends(require_permission(Permission.USER_READ)),
) -> UserResponse:
"""
Get user details.
Requires user:read permission (admin only).
"""
user = await database.fetch_one(users.select().where(users.c.id == user_id))
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserResponse(
id=user["id"],
username=user["username"],
email=user["email"],
display_name=user["display_name"],
avatar_url=user["avatar_url"],
role=user["role"],
status=user["status"],
email_verified=user["email_verified"],
created_at=user["created_at"],
updated_at=user["updated_at"],
last_login_at=user["last_login_at"],
)All rows scored by the Repobility analyzer (https://repobility.com)
update_user function · python · L286-L382 (97 LOC)api/auth/users.py
async def update_user(
user_id: str,
body: UpdateUserRequest,
current_user: dict = Depends(require_permission(Permission.USER_UPDATE)),
) -> UserResponse:
"""
Update user details.
Requires user:update permission (admin only).
"""
user = await database.fetch_one(users.select().where(users.c.id == user_id))
if not user:
raise HTTPException(status_code=404, detail="User not found")
updates = {"updated_at": datetime.now(timezone.utc)}
if body.username is not None:
# Check uniqueness
existing = await database.fetch_one(
users.select()
.where(users.c.username == body.username.lower())
.where(users.c.id != user_id)
)
if existing:
raise HTTPException(status_code=400, detail="Username already exists")
updates["username"] = body.username.lower()
if body.email is not None:
existing = await database.fetch_one(
users.select()
delete_user function · python · L386-L430 (45 LOC)api/auth/users.py
async def delete_user(
user_id: str,
current_user: dict = Depends(require_permission(Permission.USER_DELETE)),
) -> dict:
"""
Delete or disable a user.
Currently implements soft-delete by setting status to 'disabled'.
Requires user:delete permission (admin only).
"""
user = await database.fetch_one(users.select().where(users.c.id == user_id))
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent self-deletion
if user_id == current_user["id"]:
raise HTTPException(
status_code=400,
detail="Cannot delete your own account.",
)
# Soft delete - disable the account
await database.execute(
users.update()
.where(users.c.id == user_id)
.values(
status="disabled",
updated_at=datetime.now(timezone.utc),
)
)
# Invalidate all sessions
await invalidate_user_sessions(user_id)
security_logger.infoforce_password_reset function · python · L434-L499 (66 LOC)api/auth/users.py
async def force_password_reset(
user_id: str,
current_user: dict = Depends(require_permission(Permission.USER_UPDATE)),
) -> dict:
"""
Force a password reset for a user.
Generates a password reset token and invalidates all sessions.
Requires user:update permission (admin only).
"""
user = await database.fetch_one(users.select().where(users.c.id == user_id))
if not user:
raise HTTPException(status_code=404, detail="User not found")
if not user["password_hash"]:
raise HTTPException(
status_code=400,
detail="Cannot reset password for SSO-only account",
)
# Generate reset token
from api.auth.password import generate_token, hash_token
token = generate_token(32)
token_hash = hash_token(token)
now = datetime.now(timezone.utc)
from config import PASSWORD_RESET_EXPIRY_HOURS
from datetime import timedelta
expires_at = now + timedelta(hours=PASSWORD_RESET_EXPIRY_HOURS)
_sanitize_chapter_title function · python · L53-L78 (26 LOC)api/chapter_detection.py
def _sanitize_chapter_title(title: str, chapter_num: int) -> str:
"""
Sanitize a chapter title from untrusted sources (video metadata).
- Escapes HTML entities to prevent XSS
- Truncates to database column limit
- Provides fallback for empty/invalid titles
Args:
title: Raw title from video metadata
chapter_num: Chapter number for fallback title
Returns:
Sanitized title safe for storage and display
"""
if not title or not title.strip():
return f"Chapter {chapter_num}"
# Strip whitespace and escape HTML entities
sanitized = html.escape(title.strip())
# Truncate to database limit
if len(sanitized) > MAX_CHAPTER_TITLE_LENGTH:
sanitized = sanitized[: MAX_CHAPTER_TITLE_LENGTH - 3] + "..."
return sanitizedextract_chapters_from_metadata function · python · L81-L190 (110 LOC)api/chapter_detection.py
async def extract_chapters_from_metadata(
video_id: int,
timeout: float = 30.0,
) -> List[InternalDetectedChapter]:
"""
Extract chapter markers from video file metadata using ffprobe.
Supports common chapter formats:
- Matroska (MKV) chapters
- MP4/MOV chapters
- Other container formats with embedded chapter metadata
Args:
video_id: Database ID of the video (used to find upload file)
timeout: Maximum time to wait for ffprobe
Returns:
List of InternalDetectedChapter objects, may be empty if no chapters found
Raises:
RuntimeError: If ffprobe fails or times out
"""
# Find the source video file
source_path = _find_source_video(video_id)
if source_path is None:
return []
# Run ffprobe to extract chapters
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_chapters",
str(source_path),
]
process = await _find_source_video function · python · L193-L210 (18 LOC)api/chapter_detection.py
def _find_source_video(video_id: int) -> Optional[Path]:
"""
Find the source video file for chapter extraction.
Searches UPLOADS_DIR for the original upload file by video ID.
Args:
video_id: Database ID of the video
Returns:
Path to source file, or None if not found
"""
for ext in SUPPORTED_VIDEO_EXTENSIONS:
source = UPLOADS_DIR / f"{video_id}{ext}"
if source.exists() and source.stat().st_size > 0:
return source
return Nonegenerate_chapters_from_transcription function · python · L213-L257 (45 LOC)api/chapter_detection.py
async def generate_chapters_from_transcription(
transcript_text: str,
video_duration: float,
min_chapter_length: int = 60,
) -> List[InternalDetectedChapter]:
"""
Generate chapter suggestions from transcription text.
Algorithm:
1. Calculate how many chapters fit (video_duration / min_chapter_length)
2. Cap at MAX_AUTO_GENERATED_CHAPTERS to avoid overwhelming the UI
3. Divide video into equal time segments
4. Distribute transcript sentences evenly across segments
5. Use first sentence of each segment as chapter title (cleaned and truncated)
This simple time-based approach works because most long-form content
naturally divides into roughly equal sections.
Args:
transcript_text: Full transcript text from Whisper
video_duration: Total video duration in seconds
min_chapter_length: Minimum seconds between chapters
Returns:
List of InternalDetectedChapter objects
"""
if not transcript_text _split_into_sentences function · python · L260-L275 (16 LOC)api/chapter_detection.py
def _split_into_sentences(text: str) -> List[str]:
"""
Split text into sentences using punctuation boundaries.
Splits on periods, exclamation marks, and question marks.
Does NOT handle abbreviations, decimal numbers, or ellipses.
This is intentionally simple for transcript text.
Args:
text: Text to split
Returns:
List of sentence strings
"""
sentences = _SENTENCE_SPLIT_PATTERN.split(text)
return [s.strip() for s in sentences if s.strip()]Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
_analyze_transcript_for_chapters function · python · L278-L342 (65 LOC)api/chapter_detection.py
def _analyze_transcript_for_chapters(
sentences: List[str],
video_duration: float,
min_chapter_length: int,
) -> List[InternalDetectedChapter]:
"""
Analyze transcript sentences to suggest chapter boundaries.
Divides the video into equal time segments and assigns
representative titles from the transcript sentences.
Args:
sentences: List of sentences from the transcript
video_duration: Total video duration in seconds
min_chapter_length: Minimum chapter length in seconds
Returns:
List of suggested chapters
"""
if video_duration < min_chapter_length:
return []
# Calculate how many chapters we can have
max_chapters = int(video_duration // min_chapter_length)
if max_chapters < 2:
# Not enough duration for multiple chapters
return []
# Limit to reasonable number of chapters
num_chapters = min(max_chapters, MAX_AUTO_GENERATED_CHAPTERS)
# Distribute sentences across _truncate_at_word_boundary function · python · L345-L370 (26 LOC)api/chapter_detection.py
def _truncate_at_word_boundary(text: str, max_length: int) -> str:
"""
Truncate text to max_length, preferring word boundaries.
Adds '...' if truncated. Breaks at word boundary only if
it's past the halfway point, to avoid very short titles.
Args:
text: Text to truncate
max_length: Maximum length (not including ellipsis)
Returns:
Truncated text with ellipsis if needed
"""
if len(text) <= max_length:
return text
truncated = text[:max_length]
last_space_position = truncated.rfind(" ")
# Only break at word if it's not too early
minimum_break_position = max_length // 2
if last_space_position > minimum_break_position:
return truncated[:last_space_position] + "..."
return truncated + "..."_generate_chapter_title function · python · L373-L402 (30 LOC)api/chapter_detection.py
def _generate_chapter_title(sentence: str, chapter_num: int) -> str:
"""
Generate a chapter title from a sentence.
Removes filler words and truncates to create a concise, descriptive title.
Args:
sentence: Source sentence for the title
chapter_num: Chapter number (used as fallback)
Returns:
Generated chapter title
"""
cleaned_sentence = sentence
# Remove common filler words at the start
for pattern in _FILLER_PATTERNS:
cleaned_sentence = pattern.sub("", cleaned_sentence)
# Truncate to display length
title = _truncate_at_word_boundary(cleaned_sentence, DISPLAY_TITLE_LENGTH)
# Clean up punctuation at the end
title = _PUNCTUATION_END_PATTERN.sub("", title)
# If title is too short or empty, use fallback
if len(title.strip()) < MIN_TITLE_LENGTH:
return f"Section {chapter_num}"
return title.strip()filter_chapters_by_length function · python · L405-L448 (44 LOC)api/chapter_detection.py
def filter_chapters_by_length(
chapters: List[InternalDetectedChapter],
min_chapter_length: int,
video_duration: float,
) -> List[InternalDetectedChapter]:
"""
Filter out chapters that are too short or too close together.
Args:
chapters: List of detected chapters
min_chapter_length: Minimum seconds between chapters
video_duration: Total video duration
Returns:
Filtered list of chapters
"""
if not chapters:
return []
# Sort by start time
sorted_chapters = sorted(chapters, key=lambda c: c.start_time)
filtered = []
last_start_time: Optional[float] = None
for chapter in sorted_chapters:
# Always include first chapter
if last_start_time is None:
filtered.append(chapter)
last_start_time = chapter.start_time
continue
# Skip if too close to previous chapter
if chapter.start_time - last_start_time < min_chapter_length:
validate_slug function · python · L56-L77 (22 LOC)api/common.py
def validate_slug(slug: str) -> bool:
r"""
Validate slug contains only safe characters and has no path traversal attempts.
Args:
slug: The slug string to validate
Returns:
True if slug is valid, False otherwise
Security:
- Prevents path traversal attacks (../, ..\, etc.)
- Ensures slug matches safe character pattern (lowercase alphanumeric with hyphens)
- Defense in depth: slugs are generated server-side but this validates user input
"""
if not slug:
return False
# Check for path traversal sequences
if '..' in slug:
return False
# Check against allowed pattern
return bool(SLUG_PATTERN.match(slug))require_valid_slug function · python · L80-L112 (33 LOC)api/common.py
def require_valid_slug(slug: str, resource_type: str = "resource") -> None:
"""
Validate slug or raise HTTPException with 400 status.
Security: Prevents path traversal attacks by ensuring slug contains
only safe characters (lowercase alphanumeric with hyphens).
Note: This function intentionally duplicates validate_slug() logic to provide
specific error messages for each failure case (missing, path traversal, format).
This improves API usability by helping clients understand exactly what's wrong.
Args:
slug: The slug string to validate
resource_type: Type of resource for error message (e.g., "video", "category")
Raises:
HTTPException: 400 error if slug is invalid
"""
if not slug:
raise HTTPException(
status_code=400,
detail=f"Missing {resource_type} slug"
)
if '..' in slug:
raise HTTPException(
status_code=400,
detail=f"Invalid {resource_typverify_stream_access function · python · L115-L151 (37 LOC)api/common.py
async def verify_stream_access(slug: str, user: dict) -> dict:
"""
Verify user has access to a stream (ownership or admin permission).
This is a shared utility to avoid duplicating stream access logic across
studio modules. Uses a 404 response for unauthorized access to prevent
enumeration attacks.
Args:
slug: Stream slug to verify access for
user: User dict from authenticated session
Returns:
Stream record as dict if access is granted
Raises:
HTTPException: 400 if slug format is invalid
HTTPException: 404 if stream not found or user doesn't have access
"""
require_valid_slug(slug, "stream")
stream = await fetch_one_with_retry(
live_streams.select().where(live_streams.c.slug == slug)
)
if not stream:
raise HTTPException(status_code=404, detail="Stream not found")
# Owner check OR admin permission
role = Role(user["role"])
is_owner = stream["owner_id"] == user["_get_storage_health_lock function · python · L164-L191 (28 LOC)api/common.py
def _get_storage_health_lock() -> asyncio.Lock:
"""Get or create the storage health cache lock.
The lock is created lazily to ensure it's bound to the correct event loop.
This is necessary because the lock may be used across different event loops
in testing or when the application restarts.
"""
global _storage_health_lock
if _storage_health_lock is None:
_storage_health_lock = asyncio.Lock()
return _storage_health_lock
# Check if the lock is bound to a different event loop
# by comparing the lock's internal loop (if accessible) with the current loop
try:
current_loop = asyncio.get_running_loop()
# Access the internal _loop attribute which exists on asyncio.Lock
# This is safer than calling _get_loop() which is more private
lock_loop = getattr(_storage_health_lock, '_loop', None)
if lock_loop is not None and lock_loop is not current_loop:
# Lock is from a different event loop, If a scraper extracted this row, it came from Repobility (https://repobility.com)
ensure_utc function · python · L194-L223 (30 LOC)api/common.py
def ensure_utc(dt: Optional[datetime]) -> Optional[datetime]:
"""
Ensure datetime is timezone-aware UTC.
This function ensures consistent timezone handling for datetime comparisons,
handling both timezone-aware and timezone-naive datetime objects.
Args:
dt: A datetime object (may be None, timezone-aware, or timezone-naive)
Returns:
- None if input is None
- UTC datetime if input was timezone-aware (converted to UTC if needed)
- UTC datetime if input was timezone-naive (assumed to be UTC)
Examples:
>>> ensure_utc(None)
None
>>> ensure_utc(datetime(2024, 1, 1, 12, 0, 0)) # naive
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
>>> ensure_utc(datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc))
datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc)
"""
if dt is None:
return None
if dt.tzinfo is None:
# Assume naive datetimes are UTC
return dtcalculate_stream_offset_ms function · python · L226-L253 (28 LOC)api/common.py
def calculate_stream_offset_ms(
stream_started_at: Optional[datetime],
current_time: datetime
) -> Optional[int]:
"""
Calculate milliseconds elapsed since stream start.
Handles timezone normalization for naive datetimes (assumes UTC).
Returns None if stream_started_at is None. Clamps result to non-negative
to protect against clock skew.
Args:
stream_started_at: When the stream started (may be timezone-naive, assumed UTC)
current_time: Current timestamp (may be timezone-naive, assumed UTC)
Returns:
Non-negative milliseconds since stream start, or None if stream_started_at is None
"""
if stream_started_at is None:
return None
# Normalize both timestamps to UTC to prevent TypeError on subtraction
started = ensure_utc(stream_started_at)
current = ensure_utc(current_time)
offset_ms = int((current - started).total_seconds() * 1000)
# Clamp to non-negative to protect against clock skew
retget_real_ip function · python · L256-L274 (19 LOC)api/common.py
def get_real_ip(request: Request) -> str:
"""
Get the real client IP address, respecting X-Forwarded-For header only from trusted proxies.
Security: X-Forwarded-For is only trusted when the direct client IP is in TRUSTED_PROXIES.
This prevents attackers from spoofing the header to bypass rate limiting.
Configure VLOG_TRUSTED_PROXIES with your proxy IPs (e.g., "127.0.0.1,10.0.0.1").
"""
client_ip = get_remote_address(request)
# Only trust X-Forwarded-For if request came from a trusted proxy
if TRUSTED_PROXIES and client_ip in TRUSTED_PROXIES:
forwarded = request.headers.get("X-Forwarded-For")
if forwarded:
# X-Forwarded-For can contain multiple IPs: client, proxy1, proxy2, ...
# The first one is the original client
return forwarded.split(",")[0].strip()
return client_ip