Function bodies 1,000 total
RedisAnalyticsCache.cleanup_expired method · python · L420-L430 (11 LOC)api/analytics_cache.py
def cleanup_expired(self) -> int:
"""
Remove all expired entries from cache.
Redis handles TTL-based expiration automatically, so this is a no-op.
Returns:
Always returns 0 (Redis handles expiration)
"""
# Redis handles TTL expiration automatically
return 0RedisAnalyticsCache.get_stats method · python · L432-L467 (36 LOC)api/analytics_cache.py
def get_stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dict with cache stats (TTL, enabled status, backend type, connection status)
"""
def count_keys():
start_time = time.time()
entry_count = 0
cursor = 0
pattern = f"{self.CACHE_KEY_PREFIX}*"
while True:
# Check timeout to prevent blocking on large keyspaces
if time.time() - start_time > REDIS_BULK_OPERATION_TIMEOUT:
logger.warning(
f"Redis key count timed out after {REDIS_BULK_OPERATION_TIMEOUT}s, "
f"returning partial count: {entry_count}"
)
return entry_count
cursor, keys = self._client.scan(cursor, match=pattern, count=REDIS_SCAN_BATCH_SIZE)
entry_count += len(keys)
if cursor == 0:
break
create_analytics_cache function · python · L474-L509 (36 LOC)api/analytics_cache.py
def create_analytics_cache(
storage_url: str = "memory://",
ttl_seconds: int = DEFAULT_CACHE_TTL_SECONDS,
enabled: bool = True,
max_size: int = DEFAULT_CACHE_MAX_SIZE,
) -> AnalyticsCacheType:
"""
Factory function to create the appropriate analytics cache implementation.
Args:
storage_url: Storage backend URL. Use "memory://" for in-memory cache,
or a Redis URL like "redis://localhost:6379" for shared cache.
ttl_seconds: Time to live in seconds for cache entries
enabled: Whether caching is enabled
max_size: Maximum entries for in-memory cache (ignored for Redis)
Returns:
Either AnalyticsCache (memory) or RedisAnalyticsCache (Redis) instance
"""
if not enabled:
# Return disabled memory cache - simplest option
return AnalyticsCache(ttl_seconds=ttl_seconds, enabled=False, max_size=max_size)
if storage_url.startswith("redis://") or storage_url.startswith("rediss://"):
AuditLogger.__init__ method · python · L155-L161 (7 LOC)api/audit.py
def __init__(self):
self.logger = logging.getLogger("vlog.audit")
self.logger.setLevel(getattr(logging, AUDIT_LOG_LEVEL, logging.INFO))
self.logger.propagate = False # Don't propagate to root logger
if not self.logger.handlers:
self._setup_handlers()AuditLogger._setup_handlers method · python · L163-L187 (25 LOC)api/audit.py
def _setup_handlers(self):
"""Set up logging handlers with rotation support."""
formatter = logging.Formatter("%(message)s") # Raw JSON output
if AUDIT_LOG_ENABLED:
try:
# Use RotatingFileHandler for automatic log rotation
# Rotates when file reaches AUDIT_LOG_MAX_BYTES (default 10MB)
# Keeps AUDIT_LOG_BACKUP_COUNT backup files (default 5)
file_handler = RotatingFileHandler(
AUDIT_LOG_PATH,
maxBytes=AUDIT_LOG_MAX_BYTES,
backupCount=AUDIT_LOG_BACKUP_COUNT,
encoding="utf-8",
)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
except (PermissionError, OSError):
# Fall back to console logging
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
AuditLogger.log method · python · L189-L247 (59 LOC)api/audit.py
def log(
self,
action: AuditAction,
client_ip: Optional[str] = None,
user_agent: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[Any] = None,
resource_name: Optional[str] = None,
details: Optional[dict] = None,
success: bool = True,
error: Optional[str] = None,
request_id: Optional[str] = None,
):
"""
Log an audit event.
Args:
action: The type of action being performed
client_ip: IP address of the client making the request
user_agent: User-Agent header from the request
resource_type: Type of resource (video, category, etc.)
resource_id: ID of the affected resource
resource_name: Human-readable name of the resource (slug, title, etc.)
details: Additional action-specific details
success: Whether the action succeeded
error: Error mlog_audit function · python · L254-L291 (38 LOC)api/audit.py
def log_audit(
action: AuditAction,
client_ip: Optional[str] = None,
user_agent: Optional[str] = None,
resource_type: Optional[str] = None,
resource_id: Optional[Any] = None,
resource_name: Optional[str] = None,
details: Optional[dict] = None,
success: bool = True,
error: Optional[str] = None,
request_id: Optional[str] = None,
):
"""
Convenience function for logging audit events.
Example usage:
log_audit(
AuditAction.VIDEO_UPLOAD,
client_ip=request.client.host,
resource_type="video",
resource_id=video_id,
resource_name=slug,
details={"title": title, "category_id": category_id},
request_id=get_request_id(request)
)
"""
audit_logger.log(
action=action,
client_ip=client_ip,
user_agent=user_agent,
resource_type=resource_type,
resource_id=resource_id,
resource_name=resource_name,
If a scraper extracted this row, it came from Repobility (https://repobility.com)
list_api_keys function · python · L72-L100 (29 LOC)api/auth/api_keys.py
async def list_api_keys(
user: dict = Depends(require_auth),
) -> ApiKeyListResponse:
"""
List all API keys for the current user.
Note: The actual key values are never returned after creation.
"""
keys = await database.fetch_all(
user_api_keys.select()
.where(user_api_keys.c.user_id == user["id"])
.where(user_api_keys.c.revoked_at.is_(None))
.order_by(user_api_keys.c.created_at.desc())
)
return ApiKeyListResponse(
keys=[
ApiKeyResponse(
id=k["id"],
name=k["name"],
key_prefix=k["key_prefix"],
expires_at=k["expires_at"],
last_used_at=k["last_used_at"],
created_at=k["created_at"],
)
for k in keys
],
total=len(keys),
)create_api_key function · python · L104-L158 (55 LOC)api/auth/api_keys.py
async def create_api_key(
body: CreateApiKeyRequest,
user: dict = Depends(require_auth),
) -> CreateApiKeyResponse:
"""
Create a new API key.
IMPORTANT: The key is only returned once. Store it securely.
"""
now = datetime.now(timezone.utc)
# Generate key
api_key = generate_token(32) # ~44 character URL-safe token
key_hash = hash_token_fast(api_key) # SHA-256 for fast verification
key_prefix = get_token_prefix(api_key)
# Calculate expiry
expires_at = None
if body.expires_in_days:
expires_at = now + timedelta(days=body.expires_in_days)
key_id = str(uuid.uuid4())
await database.execute(
user_api_keys.insert().values(
id=key_id,
user_id=user["id"],
name=body.name.strip(),
key_prefix=key_prefix,
key_hash=key_hash,
expires_at=expires_at,
created_at=now,
)
)
security_logger.info(
"API key created",
get_api_key function · python · L162-L184 (23 LOC)api/auth/api_keys.py
async def get_api_key(
key_id: str,
user: dict = Depends(require_auth),
) -> ApiKeyResponse:
"""Get details of a specific API key."""
key = await database.fetch_one(
user_api_keys.select()
.where(user_api_keys.c.id == key_id)
.where(user_api_keys.c.user_id == user["id"])
.where(user_api_keys.c.revoked_at.is_(None))
)
if not key:
raise HTTPException(status_code=404, detail="API key not found")
return ApiKeyResponse(
id=key["id"],
name=key["name"],
key_prefix=key["key_prefix"],
expires_at=key["expires_at"],
last_used_at=key["last_used_at"],
created_at=key["created_at"],
)revoke_api_key function · python · L188-L224 (37 LOC)api/auth/api_keys.py
async def revoke_api_key(
key_id: str,
user: dict = Depends(require_auth),
) -> dict:
"""
Revoke an API key.
The key will immediately stop working.
"""
key = await database.fetch_one(
user_api_keys.select()
.where(user_api_keys.c.id == key_id)
.where(user_api_keys.c.user_id == user["id"])
.where(user_api_keys.c.revoked_at.is_(None))
)
if not key:
raise HTTPException(status_code=404, detail="API key not found")
await database.execute(
user_api_keys.update()
.where(user_api_keys.c.id == key_id)
.values(revoked_at=datetime.now(timezone.utc))
)
security_logger.info(
"API key revoked",
extra={
"event": "api_key_revoked",
"user_id": user["id"],
"key_id": key_id,
"key_name": key["name"],
"key_prefix": key["key_prefix"],
},
)
return {"message": "API key revoked"}_set_session_cookies function · python · L186-L217 (32 LOC)api/auth/endpoints.py
def _set_session_cookies(
response: Response,
session_token: str,
refresh_token: str,
expires_at: datetime,
refresh_expires_at: datetime,
) -> None:
"""Set session and refresh token cookies."""
# Calculate max_age in seconds
now = datetime.now(timezone.utc)
session_max_age = int((expires_at - now).total_seconds())
refresh_max_age = int((refresh_expires_at - now).total_seconds())
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=session_token,
max_age=session_max_age,
httponly=True,
secure=SECURE_COOKIES,
samesite="lax",
path="/",
)
response.set_cookie(
key=REFRESH_COOKIE_NAME,
value=refresh_token,
max_age=refresh_max_age,
httponly=True,
secure=SECURE_COOKIES,
samesite="lax",
path="/api/v1/auth/refresh", # Only sent to refresh endpoint
)_generate_csrf_token function · python · L226-L235 (10 LOC)api/auth/endpoints.py
def _generate_csrf_token(session_token: str) -> str:
"""Generate CSRF token from session token."""
if not SESSION_SECRET_KEY:
raise ValueError("SESSION_SECRET_KEY not configured")
return hmac.new(
SESSION_SECRET_KEY.encode(),
session_token.encode(),
"sha256",
).hexdigest()[:32]_increment_failed_login function · python · L238-L276 (39 LOC)api/auth/endpoints.py
async def _increment_failed_login(user_id: str) -> None:
"""Increment failed login count and potentially lock account."""
now = datetime.now(timezone.utc)
user = await database.fetch_one(
users.select().where(users.c.id == user_id)
)
if not user:
return
new_count = (user["failed_login_attempts"] or 0) + 1
if new_count >= LOGIN_LOCKOUT_THRESHOLD:
# Lock account
locked_until = now + timedelta(minutes=LOGIN_LOCKOUT_DURATION_MINUTES)
await database.execute(
users.update()
.where(users.c.id == user_id)
.values(
failed_login_attempts=new_count,
locked_until=locked_until,
)
)
security_logger.warning(
"Account locked due to failed logins",
extra={
"event": "account_locked",
"user_id": user_id,
"failed_attempts": new_count,
"locked_until": loc_reset_failed_login function · python · L279-L289 (11 LOC)api/auth/endpoints.py
async def _reset_failed_login(user_id: str) -> None:
"""Reset failed login count after successful login."""
await database.execute(
users.update()
.where(users.c.id == user_id)
.values(
failed_login_attempts=0,
locked_until=None,
last_login_at=datetime.now(timezone.utc),
)
)Same scanner, your repo: https://repobility.com — Repobility
get_setup_status function · python · L298-L325 (28 LOC)api/auth/endpoints.py
async def get_setup_status() -> SetupStatusResponse:
"""
Check if initial setup is required.
Returns needs_setup=True if no users exist in the system.
This endpoint is always public.
"""
try:
user_count = await database.fetch_val(
"SELECT COUNT(*) FROM users"
)
except Exception:
logger.exception("Database error checking setup status")
raise HTTPException(
status_code=503,
detail="Service temporarily unavailable. Please try again.",
)
if user_count == 0:
return SetupStatusResponse(
needs_setup=True,
message="No users exist. Please create an admin account to get started.",
)
return SetupStatusResponse(
needs_setup=False,
message="Setup complete. Please log in.",
)create_initial_admin function · python · L329-L426 (98 LOC)api/auth/endpoints.py
async def create_initial_admin(
request: Request,
response: Response,
body: SetupRequest,
) -> SetupResponse:
"""
Create the initial admin account.
This endpoint only works when no users exist in the system.
Once an admin is created, this endpoint returns 403.
"""
# Validate password strength first (before any DB operations)
is_valid, error = validate_password_strength(body.password)
if not is_valid:
raise HTTPException(status_code=400, detail=error)
now = datetime.now(timezone.utc)
user_id = str(uuid.uuid4())
password_hash = hash_password(body.password)
ip_address = _get_client_ip(request)
user_agent = request.headers.get("user-agent")
# Use transaction to prevent TOCTOU race condition:
# - Check user count and insert atomically
# - Unique constraints on username/email handle concurrent requests
try:
async with database.transaction():
# Check if any users exist (within transalogin function · python · L430-L570 (141 LOC)api/auth/endpoints.py
async def login(
request: Request,
response: Response,
body: LoginRequest,
) -> LoginResponse:
"""
Authenticate with username/email and password.
Sets HTTP-only session cookies on success.
"""
ip_address = _get_client_ip(request)
user_agent = request.headers.get("user-agent")
login_input = body.username_or_email.lower()
# Hash identifier for logging (avoid PII in logs)
identifier_hash = hashlib.sha256(login_input.encode()).hexdigest()[:16]
# Find user by email or username (case-insensitive)
try:
user = await database.fetch_one(
users.select().where(
or_(
func.lower(users.c.email) == login_input,
func.lower(users.c.username) == login_input,
)
)
)
except Exception as e:
logger.exception("Database error during login user lookup")
raise HTTPException(
status_code=503,
detail="Serlogout function · python · L574-L598 (25 LOC)api/auth/endpoints.py
async def logout(
request: Request,
response: Response,
user: dict = Depends(require_auth),
) -> dict:
"""
Log out and invalidate current session.
"""
session_id = user.get("session_id")
if session_id:
await invalidate_session(session_id)
_clear_session_cookies(response)
security_logger.info(
"Logout",
extra={
"event": "logout",
"user_id": user["id"],
"session_id": session_id,
"ip_address": _get_client_ip(request),
},
)
return {"message": "Logged out successfully"}refresh function · python · L602-L656 (55 LOC)api/auth/endpoints.py
async def refresh(
request: Request,
response: Response,
refresh_token: Optional[str] = Cookie(None, alias=REFRESH_COOKIE_NAME),
) -> LoginResponse:
"""
Refresh session using refresh token.
Implements token rotation - old tokens become invalid after use.
"""
if not refresh_token:
raise HTTPException(status_code=401, detail="No refresh token provided")
ip_address = _get_client_ip(request)
user_agent = request.headers.get("user-agent")
try:
new_session, new_refresh, expires_at, refresh_expires_at = await refresh_user_session(
refresh_token=refresh_token,
ip_address=ip_address,
user_agent=user_agent,
)
except RefreshTokenReusedError:
_clear_session_cookies(response)
raise HTTPException(
status_code=401,
detail="Session invalidated for security. Please log in again.",
)
except (SessionExpiredError, SessionRevokedError):
_check_auth function · python · L660-L702 (43 LOC)api/auth/endpoints.py
async def check_auth(
request: Request,
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
) -> AuthCheckResponse:
"""
Check if the current session is valid.
Returns user info if authenticated, or authenticated=false if not.
"""
if not session_token:
return AuthCheckResponse(
authenticated=False,
oidc_enabled=OIDC_ENABLED,
oidc_provider_name=OIDC_PROVIDER_NAME,
)
user = await validate_session_token(session_token, allow_grace_period=True)
if not user:
return AuthCheckResponse(
authenticated=False,
oidc_enabled=OIDC_ENABLED,
oidc_provider_name=OIDC_PROVIDER_NAME,
)
# Get permissions for user's role
role_permissions = get_role_permissions(user["role"])
permissions = [p.value for p in role_permissions]
return AuthCheckResponse(
authenticated=True,
oidc_enabled=OIDC_ENABLED,
oidc_providerget_current_user_info function · python · L706-L718 (13 LOC)api/auth/endpoints.py
async def get_current_user_info(user: dict = Depends(require_auth)) -> dict:
"""Get current user profile."""
return {
"id": user["id"],
"username": user["username"],
"email": user["email"],
"display_name": user["display_name"],
"avatar_url": user["avatar_url"],
"role": user["role"],
"email_verified": user["email_verified"],
"created_at": user["created_at"].isoformat() if user["created_at"] else None,
"last_login_at": user["last_login_at"].isoformat() if user["last_login_at"] else None,
}update_profile function · python · L722-L753 (32 LOC)api/auth/endpoints.py
async def update_profile(
body: ProfileUpdateRequest,
user: dict = Depends(require_auth),
) -> dict:
"""Update current user profile."""
updates = {}
if body.display_name is not None:
updates["display_name"] = body.display_name.strip() if body.display_name else None
if body.avatar_url is not None:
updates["avatar_url"] = body.avatar_url.strip() if body.avatar_url else None
if updates:
updates["updated_at"] = datetime.now(timezone.utc)
await database.execute(
users.update().where(users.c.id == user["id"]).values(**updates)
)
# Return updated user
updated_user = await database.fetch_one(
users.select().where(users.c.id == user["id"])
)
return {
"id": updated_user["id"],
"username": updated_user["username"],
"email": updated_user["email"],
"display_name": updated_user["display_name"],
"avatar_url": updated_user["avatar_url"],
"role": uMethodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
change_password function · python · L757-L810 (54 LOC)api/auth/endpoints.py
async def change_password(
request: Request,
body: PasswordChangeRequest,
user: dict = Depends(require_auth),
) -> dict:
"""Change current user's password."""
# Verify current password
current_user = await database.fetch_one(
users.select().where(users.c.id == user["id"])
)
if not current_user["password_hash"]:
raise HTTPException(
status_code=400,
detail="Cannot change password for SSO-only account",
)
if not verify_password(body.current_password, current_user["password_hash"]):
security_logger.warning(
"Password change failed: invalid current password",
extra={
"event": "password_change_failed",
"user_id": user["id"],
"ip_address": _get_client_ip(request),
},
)
raise HTTPException(status_code=401, detail="Current password is incorrect")
# Validate new password
is_valid, error = validaforgot_password function · python · L814-L881 (68 LOC)api/auth/endpoints.py
async def forgot_password(
request: Request,
body: ForgotPasswordRequest,
) -> dict:
"""
Request password reset.
Always returns success to prevent email enumeration.
"""
# Check if password reset is enabled (requires email delivery to be configured)
if not PASSWORD_RESET_ENABLED:
raise HTTPException(
status_code=503,
detail="Password reset is not available. Please contact an administrator.",
)
ip_address = _get_client_ip(request)
now = datetime.now(timezone.utc)
# Always return success (constant-time response)
success_response = {"message": "If an account exists with this email, a reset link has been sent"}
# Find user
user = await database.fetch_one(
users.select().where(users.c.email == body.email.lower())
)
if not user:
# Simulate work to prevent timing attacks
hash_password("dummy")
return success_response
if not user["password_hash"]:
reset_password function · python · L885-L963 (79 LOC)api/auth/endpoints.py
async def reset_password(
request: Request,
body: ResetPasswordRequest,
) -> dict:
"""Reset password using token."""
# Check if password reset is enabled
if not PASSWORD_RESET_ENABLED:
raise HTTPException(
status_code=503,
detail="Password reset is not available. Please contact an administrator.",
)
ip_address = _get_client_ip(request)
now = datetime.now(timezone.utc)
# Validate new password
is_valid, error = validate_password_strength(body.new_password)
if not is_valid:
raise HTTPException(status_code=400, detail=error)
# Find token
all_tokens = await database.fetch_all(
password_reset_tokens.select()
.where(password_reset_tokens.c.used_at.is_(None))
.where(password_reset_tokens.c.expires_at > now)
)
from api.auth.password import verify_token
valid_token = None
for token_record in all_tokens:
if verify_token(body.token, token_record["tokelist_sessions function · python · L967-L985 (19 LOC)api/auth/endpoints.py
async def list_sessions(
request: Request,
user: dict = Depends(require_auth),
) -> list[dict]:
"""List active sessions for current user."""
current_session_id = user.get("session_id")
sessions = await get_user_sessions(user["id"])
return [
{
"id": s["id"],
"ip_address": s["ip_address"],
"user_agent": s["user_agent"],
"created_at": s["created_at"].isoformat() if s["created_at"] else None,
"expires_at": s["expires_at"].isoformat() if s["expires_at"] else None,
"is_current": s["id"] == current_session_id,
}
for s in sessions
]revoke_session function · python · L989-L1024 (36 LOC)api/auth/endpoints.py
async def revoke_session(
session_id: str,
request: Request,
response: Response,
user: dict = Depends(require_auth),
) -> dict:
"""Revoke a specific session."""
# Verify session belongs to user
from api.database import user_sessions
session = await database.fetch_one(
user_sessions.select()
.where(user_sessions.c.id == session_id)
.where(user_sessions.c.user_id == user["id"])
)
if not session:
raise HTTPException(status_code=404, detail="Session not found")
await invalidate_session(session_id)
# If revoking current session, clear cookies
if session_id == user.get("session_id"):
_clear_session_cookies(response)
security_logger.info(
"Session revoked",
extra={
"event": "session_revoked",
"user_id": user["id"],
"revoked_session_id": session_id,
"ip_address": _get_client_ip(request),
},
)
return {"message": "Seget_csrf_token function · python · L1028-L1044 (17 LOC)api/auth/endpoints.py
async def get_csrf_token(
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
) -> dict:
"""
Get CSRF token for the current session.
The CSRF token is derived from the session token using HMAC.
"""
if not session_token:
raise HTTPException(status_code=401, detail="No session")
try:
csrf_token = _generate_csrf_token(session_token)
except ValueError as e:
raise HTTPException(status_code=500, detail="Server configuration error")
return {"csrf_token": csrf_token}_validate_role function · python · L110-L117 (8 LOC)api/auth/invite.py
def _validate_role(role: str) -> None:
"""Validate role value."""
valid_roles = [r.value for r in Role]
if role not in valid_roles:
raise HTTPException(
status_code=400,
detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}",
)_find_invite_by_token function · python · L120-L135 (16 LOC)api/auth/invite.py
async def _find_invite_by_token(token: str) -> Optional[dict]:
"""Find an invite by token."""
now = datetime.now(timezone.utc)
# Get all pending invites
invites = await database.fetch_all(
user_invites.select()
.where(user_invites.c.used_at.is_(None))
.where(user_invites.c.expires_at > now)
)
for invite in invites:
if verify_token(token, invite["token_hash"]):
return dict(invite)
return NoneRepobility · severity-and-effort ranking · https://repobility.com
list_invites function · python · L144-L177 (34 LOC)api/auth/invite.py
async def list_invites(
pending_only: bool = Query(default=True),
current_user: dict = Depends(require_permission(Permission.INVITE_READ)),
) -> InviteListResponse:
"""
List all invites.
Requires invite:read permission (admin only).
"""
query = user_invites.select()
if pending_only:
now = datetime.now(timezone.utc)
query = query.where(user_invites.c.used_at.is_(None))
query = query.where(user_invites.c.expires_at > now)
query = query.order_by(user_invites.c.created_at.desc())
invites = await database.fetch_all(query)
return InviteListResponse(
invites=[
InviteResponse(
id=i["id"],
email=i["email"],
role=i["role"],
expires_at=i["expires_at"],
created_at=i["created_at"],
used_at=i["used_at"],
)
for i in invites
],
total=len(invites),
)create_invite function · python · L181-L262 (82 LOC)api/auth/invite.py
async def create_invite(
body: CreateInviteRequest,
current_user: dict = Depends(require_permission(Permission.INVITE_CREATE)),
) -> CreateInviteResponse:
"""
Create an invite for a new user.
Requires invite:create permission (admin only).
The token should be sent to the user via email.
"""
_validate_role(body.role)
# Check if email already has a user
existing_user = await database.fetch_one(
users.select().where(users.c.email == body.email.lower())
)
if existing_user:
raise HTTPException(
status_code=400,
detail="A user with this email already exists",
)
# Check for pending invite
now = datetime.now(timezone.utc)
existing_invite = await database.fetch_one(
user_invites.select()
.where(user_invites.c.email == body.email.lower())
.where(user_invites.c.used_at.is_(None))
.where(user_invites.c.expires_at > now)
)
if existing_invite:
revoke_invite function · python · L266-L300 (35 LOC)api/auth/invite.py
async def revoke_invite(
invite_id: str,
current_user: dict = Depends(require_permission(Permission.INVITE_DELETE)),
) -> dict:
"""
Revoke an invite.
Requires invite:delete permission (admin only).
"""
invite = await database.fetch_one(
user_invites.select().where(user_invites.c.id == invite_id)
)
if not invite:
raise HTTPException(status_code=404, detail="Invite not found")
if invite["used_at"]:
raise HTTPException(status_code=400, detail="Invite has already been used")
# Delete the invite
await database.execute(
user_invites.delete().where(user_invites.c.id == invite_id)
)
security_logger.info(
"Invite revoked",
extra={
"event": "invite_revoked",
"invite_id": invite_id,
"email": invite["email"],
"revoked_by": current_user["id"],
},
)
return {"message": "Invite revoked"}validate_invite function · python · L309-L325 (17 LOC)api/auth/invite.py
async def validate_invite(token: str) -> ValidateInviteResponse:
"""
Validate an invite token.
Public endpoint - no authentication required.
"""
invite = await _find_invite_by_token(token)
if not invite:
return ValidateInviteResponse(valid=False)
return ValidateInviteResponse(
valid=True,
email=invite["email"],
role=invite["role"],
expires_at=invite["expires_at"],
)accept_invite function · python · L329-L426 (98 LOC)api/auth/invite.py
async def accept_invite(
token: str,
body: AcceptInviteRequest,
) -> AcceptInviteResponse:
"""
Accept an invite and create a user account.
Public endpoint - no authentication required.
"""
# Check registration mode
if REGISTRATION_MODE == "disabled":
raise HTTPException(
status_code=403,
detail="Registration is currently disabled",
)
# Find and validate invite
invite = await _find_invite_by_token(token)
if not invite:
raise HTTPException(
status_code=400,
detail="Invalid or expired invite token",
)
# Validate username uniqueness
existing = await database.fetch_one(
users.select().where(users.c.username == body.username.lower())
)
if existing:
raise HTTPException(status_code=400, detail="Username already exists")
# Check if email already has a user (race condition check)
existing = await database.fetch_one(
users_get_client_ip function · python · L35-L43 (9 LOC)api/auth/middleware.py
def _get_client_ip(request: Request) -> str:
"""Extract client IP, respecting trusted proxies."""
direct_ip = request.client.host if request.client else "unknown"
forwarded_for = request.headers.get("x-forwarded-for")
if forwarded_for and direct_ip in TRUSTED_PROXIES:
return forwarded_for.split(",")[0].strip()
return direct_ipget_current_user function · python · L46-L77 (32 LOC)api/auth/middleware.py
async def get_current_user(
request: Request,
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
api_key: Optional[str] = Security(api_key_header),
) -> Optional[dict]:
"""
Get the current authenticated user from session cookie or API key.
This is a non-enforcing dependency - returns None if not authenticated.
Use require_auth() for endpoints that require authentication.
Args:
request: The FastAPI request
session_token: Session token from cookie
api_key: API key from header
Returns:
User record as dict, or None if not authenticated
"""
# Try session token first
if session_token:
user = await validate_session_token(session_token)
if user:
return user
# Try API key
if api_key:
user = await _authenticate_api_key(api_key, request)
if user:
return user
return Nonerequire_auth function · python · L80-L108 (29 LOC)api/auth/middleware.py
async def require_auth(
request: Request,
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
api_key: Optional[str] = Security(api_key_header),
) -> dict:
"""
Require authentication - raises 401 if not authenticated.
Args:
request: The FastAPI request
session_token: Session token from cookie
api_key: API key from header
Returns:
User record as dict
Raises:
HTTPException: 401 if not authenticated
"""
user = await get_current_user(request, session_token, api_key)
if not user:
raise HTTPException(
status_code=401,
detail="Authentication required",
headers={"WWW-Authenticate": "Bearer"},
)
return userIf a scraper extracted this row, it came from Repobility (https://repobility.com)
require_permission function · python · L111-L154 (44 LOC)api/auth/middleware.py
def require_permission(permission: Permission) -> Callable:
"""
Create a dependency that requires a specific permission.
Usage:
@router.post("/videos")
async def create_video(user: dict = Depends(require_permission(Permission.VIDEO_CREATE))):
...
Args:
permission: The required permission
Returns:
A FastAPI dependency function
"""
async def permission_checker(
request: Request,
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
api_key: Optional[str] = Security(api_key_header),
) -> dict:
user = await require_auth(request, session_token, api_key)
role = Role(user["role"])
if not has_permission(role, permission):
security_logger.warning(
"Permission denied",
extra={
"event": "permission_denied",
"user_id": user["id"],
"username": user["usrequire_role function · python · L157-L201 (45 LOC)api/auth/middleware.py
def require_role(role: Role) -> Callable:
"""
Create a dependency that requires a specific role or higher.
Role hierarchy: admin > editor > viewer
Args:
role: The minimum required role
Returns:
A FastAPI dependency function
"""
role_hierarchy = {Role.VIEWER: 0, Role.EDITOR: 1, Role.ADMIN: 2}
async def role_checker(
request: Request,
session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
api_key: Optional[str] = Security(api_key_header),
) -> dict:
user = await require_auth(request, session_token, api_key)
user_role = Role(user["role"])
required_level = role_hierarchy.get(role, 999)
user_level = role_hierarchy.get(user_role, -1)
if user_level < required_level:
security_logger.warning(
"Insufficient role",
extra={
"event": "role_denied",
"user_id": user["id"],
require_ownership_or_permission function · python · L204-L239 (36 LOC)api/auth/middleware.py
async def require_ownership_or_permission(
resource_owner_id: Optional[str],
permission: Permission,
any_permission: Permission,
user: dict,
) -> bool:
"""
Check if user owns resource or has permission to access any.
Args:
resource_owner_id: The owner ID of the resource
permission: The base permission (e.g., VIDEO_UPDATE)
any_permission: The "any" permission (e.g., VIDEO_UPDATE_ANY)
user: The current user
Returns:
True if authorized
Raises:
HTTPException: 403 if not authorized
"""
role = Role(user["role"])
# Check for "any" permission (admin level)
if has_permission(role, any_permission):
return True
# Check ownership + base permission
if has_permission(role, permission):
if resource_owner_id is None or resource_owner_id == user["id"]:
return True
raise HTTPException(
status_code=403,
detail="You don't have permission to acces_authenticate_api_key function · python · L242-L340 (99 LOC)api/auth/middleware.py
async def _authenticate_api_key(api_key: str, request: Request) -> Optional[dict]:
"""
Authenticate a user API key.
Args:
api_key: The API key to authenticate
request: The request for logging context
Returns:
User record if valid, None otherwise
"""
if not api_key or len(api_key) < 8:
return None
ip_address = _get_client_ip(request)
prefix = get_token_prefix(api_key)
now = datetime.now(timezone.utc)
# Find keys with matching prefix
key_records = await database.fetch_all(
user_api_keys.select()
.where(user_api_keys.c.key_prefix == prefix)
.where(user_api_keys.c.revoked_at.is_(None))
)
for key_record in key_records:
key_hash = key_record["key_hash"]
# Support both SHA-256 (new) and argon2id (legacy) hashes
if is_sha256_hash(key_hash):
if not verify_token_fast(api_key, key_hash):
continue
else:
# Legacy arg_update_api_key_last_used function · python · L343-L352 (10 LOC)api/auth/middleware.py
async def _update_api_key_last_used(key_id: str) -> None:
"""Update API key last_used_at in background."""
try:
await database.execute(
user_api_keys.update()
.where(user_api_keys.c.id == key_id)
.values(last_used_at=datetime.now(timezone.utc))
)
except Exception:
pass # Non-critical_check_circuit_breaker function · python · L73-L94 (22 LOC)api/auth/oidc.py
async def _check_circuit_breaker() -> None:
"""Check if circuit breaker is open (thread-safe)."""
async with _circuit_breaker_lock:
if not _circuit_breaker.is_open:
return
# Check if recovery period has passed
if _circuit_breaker.last_failure_time:
recovery_time = _circuit_breaker.last_failure_time + timedelta(
seconds=CIRCUIT_BREAKER_RECOVERY_SECONDS
)
if datetime.now(timezone.utc) > recovery_time:
# Reset circuit breaker (half-open state)
_circuit_breaker.is_open = False
_circuit_breaker.failure_count = 0
logger.info("OIDC circuit breaker reset (recovery period passed)")
return
raise HTTPException(
status_code=503,
detail=f"OIDC provider temporarily unavailable. Please try again in {CIRCUIT_BREAKER_RECOVERY_SECONDS} seconds.",
)_record_failure function · python · L97-L107 (11 LOC)api/auth/oidc.py
async def _record_failure() -> None:
"""Record a failure and potentially open circuit breaker (thread-safe)."""
async with _circuit_breaker_lock:
_circuit_breaker.failure_count += 1
_circuit_breaker.last_failure_time = datetime.now(timezone.utc)
if _circuit_breaker.failure_count >= CIRCUIT_BREAKER_THRESHOLD:
_circuit_breaker.is_open = True
logger.warning(
f"OIDC circuit breaker opened after {_circuit_breaker.failure_count} failures"
)_get_oidc_config function · python · L137-L177 (41 LOC)api/auth/oidc.py
async def _get_oidc_config() -> OIDCConfig:
"""Get OIDC configuration from discovery endpoint (cached)."""
global _oidc_config_cache
if _oidc_config_cache:
cache_age = (datetime.now(timezone.utc) - _oidc_config_cache.cached_at).total_seconds()
if cache_age < OIDC_CONFIG_CACHE_TTL_SECONDS:
return _oidc_config_cache
if not OIDC_DISCOVERY_URL:
raise HTTPException(status_code=500, detail="OIDC not configured")
await _check_circuit_breaker()
try:
async with httpx.AsyncClient(timeout=OIDC_TIMEOUT_SECONDS) as client:
response = await client.get(OIDC_DISCOVERY_URL)
response.raise_for_status()
config = response.json()
except httpx.TimeoutException:
await _record_failure()
raise HTTPException(status_code=503, detail="OIDC provider timeout")
except httpx.ConnectError:
await _record_failure()
raise HTTPException(status_code=503, detail="Cannot connect Same scanner, your repo: https://repobility.com — Repobility
_set_session_cookies function · python · L206-L236 (31 LOC)api/auth/oidc.py
def _set_session_cookies(
response: Response,
session_token: str,
refresh_token: str,
expires_at: datetime,
refresh_expires_at: datetime,
) -> None:
"""Set session and refresh token cookies."""
now = datetime.now(timezone.utc)
session_max_age = int((expires_at - now).total_seconds())
refresh_max_age = int((refresh_expires_at - now).total_seconds())
response.set_cookie(
key=SESSION_COOKIE_NAME,
value=session_token,
max_age=session_max_age,
httponly=True,
secure=SECURE_COOKIES,
samesite="lax",
path="/",
)
response.set_cookie(
key=REFRESH_COOKIE_NAME,
value=refresh_token,
max_age=refresh_max_age,
httponly=True,
secure=SECURE_COOKIES,
samesite="lax",
path="/api/v1/auth/refresh",
)get_oidc_status function · python · L245-L254 (10 LOC)api/auth/oidc.py
async def get_oidc_status() -> OIDCStatusResponse:
"""
Get OIDC configuration status.
Returns whether OIDC is enabled and the provider name.
"""
return OIDCStatusResponse(
enabled=OIDC_ENABLED,
provider_name=OIDC_PROVIDER_NAME if OIDC_ENABLED else "",
)initiate_oidc_authorize function · python · L258-L312 (55 LOC)api/auth/oidc.py
async def initiate_oidc_authorize(
request: Request,
redirect_uri: str = Query(..., description="Where to redirect after auth"),
) -> OIDCAuthorizeResponse:
"""
Initiate OIDC authorization flow.
Generates state and nonce for security, stores them, and returns
the URL to redirect the user to the OIDC provider.
"""
if not OIDC_ENABLED:
raise HTTPException(status_code=400, detail="OIDC is not enabled")
config = await _get_oidc_config()
# Generate cryptographic state and nonce
state = secrets.token_urlsafe(32)
nonce = secrets.token_urlsafe(32)
now = datetime.now(timezone.utc)
expires_at = now + timedelta(minutes=OIDC_STATE_EXPIRY_MINUTES)
# Store state in database
state_id = str(uuid.uuid4())
await database.execute(
oidc_states.insert().values(
id=state_id,
state=state,
nonce=nonce,
redirect_uri=redirect_uri,
expires_at=expires_at,