← back to filthyrake__vlog

Function bodies 1,000 total

All specs Real LLM only Function bodies
RedisAnalyticsCache.cleanup_expired method · python · L420-L430 (11 LOC)
api/analytics_cache.py
    def cleanup_expired(self) -> int:
        """
        Remove all expired entries from cache.

        Redis handles TTL-based expiration automatically, so this is a no-op.

        Returns:
            Always returns 0 (Redis handles expiration)
        """
        # Redis handles TTL expiration automatically
        return 0
RedisAnalyticsCache.get_stats method · python · L432-L467 (36 LOC)
api/analytics_cache.py
    def get_stats(self) -> Dict[str, Any]:
        """
        Get cache statistics.

        Returns:
            Dict with cache stats (TTL, enabled status, backend type, connection status)
        """
        def count_keys():
            start_time = time.time()
            entry_count = 0
            cursor = 0
            pattern = f"{self.CACHE_KEY_PREFIX}*"
            while True:
                # Check timeout to prevent blocking on large keyspaces
                if time.time() - start_time > REDIS_BULK_OPERATION_TIMEOUT:
                    logger.warning(
                        f"Redis key count timed out after {REDIS_BULK_OPERATION_TIMEOUT}s, "
                        f"returning partial count: {entry_count}"
                    )
                    return entry_count
                cursor, keys = self._client.scan(cursor, match=pattern, count=REDIS_SCAN_BATCH_SIZE)
                entry_count += len(keys)
                if cursor == 0:
                    break
     
create_analytics_cache function · python · L474-L509 (36 LOC)
api/analytics_cache.py
def create_analytics_cache(
    storage_url: str = "memory://",
    ttl_seconds: int = DEFAULT_CACHE_TTL_SECONDS,
    enabled: bool = True,
    max_size: int = DEFAULT_CACHE_MAX_SIZE,
) -> AnalyticsCacheType:
    """
    Factory function to create the appropriate analytics cache implementation.

    Args:
        storage_url: Storage backend URL. Use "memory://" for in-memory cache,
                    or a Redis URL like "redis://localhost:6379" for shared cache.
        ttl_seconds: Time to live in seconds for cache entries
        enabled: Whether caching is enabled
        max_size: Maximum entries for in-memory cache (ignored for Redis)

    Returns:
        Either AnalyticsCache (memory) or RedisAnalyticsCache (Redis) instance
    """
    if not enabled:
        # Return disabled memory cache - simplest option
        return AnalyticsCache(ttl_seconds=ttl_seconds, enabled=False, max_size=max_size)

    if storage_url.startswith("redis://") or storage_url.startswith("rediss://"):
AuditLogger.__init__ method · python · L155-L161 (7 LOC)
api/audit.py
    def __init__(self):
        self.logger = logging.getLogger("vlog.audit")
        self.logger.setLevel(getattr(logging, AUDIT_LOG_LEVEL, logging.INFO))
        self.logger.propagate = False  # Don't propagate to root logger

        if not self.logger.handlers:
            self._setup_handlers()
AuditLogger._setup_handlers method · python · L163-L187 (25 LOC)
api/audit.py
    def _setup_handlers(self):
        """Set up logging handlers with rotation support."""
        formatter = logging.Formatter("%(message)s")  # Raw JSON output

        if AUDIT_LOG_ENABLED:
            try:
                # Use RotatingFileHandler for automatic log rotation
                # Rotates when file reaches AUDIT_LOG_MAX_BYTES (default 10MB)
                # Keeps AUDIT_LOG_BACKUP_COUNT backup files (default 5)
                file_handler = RotatingFileHandler(
                    AUDIT_LOG_PATH,
                    maxBytes=AUDIT_LOG_MAX_BYTES,
                    backupCount=AUDIT_LOG_BACKUP_COUNT,
                    encoding="utf-8",
                )
                file_handler.setFormatter(formatter)
                self.logger.addHandler(file_handler)
            except (PermissionError, OSError):
                # Fall back to console logging
                console_handler = logging.StreamHandler()
                console_handler.setFormatter(formatter)
    
AuditLogger.log method · python · L189-L247 (59 LOC)
api/audit.py
    def log(
        self,
        action: AuditAction,
        client_ip: Optional[str] = None,
        user_agent: Optional[str] = None,
        resource_type: Optional[str] = None,
        resource_id: Optional[Any] = None,
        resource_name: Optional[str] = None,
        details: Optional[dict] = None,
        success: bool = True,
        error: Optional[str] = None,
        request_id: Optional[str] = None,
    ):
        """
        Log an audit event.

        Args:
            action: The type of action being performed
            client_ip: IP address of the client making the request
            user_agent: User-Agent header from the request
            resource_type: Type of resource (video, category, etc.)
            resource_id: ID of the affected resource
            resource_name: Human-readable name of the resource (slug, title, etc.)
            details: Additional action-specific details
            success: Whether the action succeeded
            error: Error m
log_audit function · python · L254-L291 (38 LOC)
api/audit.py
def log_audit(
    action: AuditAction,
    client_ip: Optional[str] = None,
    user_agent: Optional[str] = None,
    resource_type: Optional[str] = None,
    resource_id: Optional[Any] = None,
    resource_name: Optional[str] = None,
    details: Optional[dict] = None,
    success: bool = True,
    error: Optional[str] = None,
    request_id: Optional[str] = None,
):
    """
    Convenience function for logging audit events.

    Example usage:
        log_audit(
            AuditAction.VIDEO_UPLOAD,
            client_ip=request.client.host,
            resource_type="video",
            resource_id=video_id,
            resource_name=slug,
            details={"title": title, "category_id": category_id},
            request_id=get_request_id(request)
        )
    """
    audit_logger.log(
        action=action,
        client_ip=client_ip,
        user_agent=user_agent,
        resource_type=resource_type,
        resource_id=resource_id,
        resource_name=resource_name,
     
If a scraper extracted this row, it came from Repobility (https://repobility.com)
list_api_keys function · python · L72-L100 (29 LOC)
api/auth/api_keys.py
async def list_api_keys(
    user: dict = Depends(require_auth),
) -> ApiKeyListResponse:
    """
    List all API keys for the current user.

    Note: The actual key values are never returned after creation.
    """
    keys = await database.fetch_all(
        user_api_keys.select()
        .where(user_api_keys.c.user_id == user["id"])
        .where(user_api_keys.c.revoked_at.is_(None))
        .order_by(user_api_keys.c.created_at.desc())
    )

    return ApiKeyListResponse(
        keys=[
            ApiKeyResponse(
                id=k["id"],
                name=k["name"],
                key_prefix=k["key_prefix"],
                expires_at=k["expires_at"],
                last_used_at=k["last_used_at"],
                created_at=k["created_at"],
            )
            for k in keys
        ],
        total=len(keys),
    )
create_api_key function · python · L104-L158 (55 LOC)
api/auth/api_keys.py
async def create_api_key(
    body: CreateApiKeyRequest,
    user: dict = Depends(require_auth),
) -> CreateApiKeyResponse:
    """
    Create a new API key.

    IMPORTANT: The key is only returned once. Store it securely.
    """
    now = datetime.now(timezone.utc)

    # Generate key
    api_key = generate_token(32)  # ~44 character URL-safe token
    key_hash = hash_token_fast(api_key)  # SHA-256 for fast verification
    key_prefix = get_token_prefix(api_key)

    # Calculate expiry
    expires_at = None
    if body.expires_in_days:
        expires_at = now + timedelta(days=body.expires_in_days)

    key_id = str(uuid.uuid4())

    await database.execute(
        user_api_keys.insert().values(
            id=key_id,
            user_id=user["id"],
            name=body.name.strip(),
            key_prefix=key_prefix,
            key_hash=key_hash,
            expires_at=expires_at,
            created_at=now,
        )
    )

    security_logger.info(
        "API key created",
 
get_api_key function · python · L162-L184 (23 LOC)
api/auth/api_keys.py
async def get_api_key(
    key_id: str,
    user: dict = Depends(require_auth),
) -> ApiKeyResponse:
    """Get details of a specific API key."""
    key = await database.fetch_one(
        user_api_keys.select()
        .where(user_api_keys.c.id == key_id)
        .where(user_api_keys.c.user_id == user["id"])
        .where(user_api_keys.c.revoked_at.is_(None))
    )

    if not key:
        raise HTTPException(status_code=404, detail="API key not found")

    return ApiKeyResponse(
        id=key["id"],
        name=key["name"],
        key_prefix=key["key_prefix"],
        expires_at=key["expires_at"],
        last_used_at=key["last_used_at"],
        created_at=key["created_at"],
    )
revoke_api_key function · python · L188-L224 (37 LOC)
api/auth/api_keys.py
async def revoke_api_key(
    key_id: str,
    user: dict = Depends(require_auth),
) -> dict:
    """
    Revoke an API key.

    The key will immediately stop working.
    """
    key = await database.fetch_one(
        user_api_keys.select()
        .where(user_api_keys.c.id == key_id)
        .where(user_api_keys.c.user_id == user["id"])
        .where(user_api_keys.c.revoked_at.is_(None))
    )

    if not key:
        raise HTTPException(status_code=404, detail="API key not found")

    await database.execute(
        user_api_keys.update()
        .where(user_api_keys.c.id == key_id)
        .values(revoked_at=datetime.now(timezone.utc))
    )

    security_logger.info(
        "API key revoked",
        extra={
            "event": "api_key_revoked",
            "user_id": user["id"],
            "key_id": key_id,
            "key_name": key["name"],
            "key_prefix": key["key_prefix"],
        },
    )

    return {"message": "API key revoked"}
_set_session_cookies function · python · L186-L217 (32 LOC)
api/auth/endpoints.py
def _set_session_cookies(
    response: Response,
    session_token: str,
    refresh_token: str,
    expires_at: datetime,
    refresh_expires_at: datetime,
) -> None:
    """Set session and refresh token cookies."""
    # Calculate max_age in seconds
    now = datetime.now(timezone.utc)
    session_max_age = int((expires_at - now).total_seconds())
    refresh_max_age = int((refresh_expires_at - now).total_seconds())

    response.set_cookie(
        key=SESSION_COOKIE_NAME,
        value=session_token,
        max_age=session_max_age,
        httponly=True,
        secure=SECURE_COOKIES,
        samesite="lax",
        path="/",
    )

    response.set_cookie(
        key=REFRESH_COOKIE_NAME,
        value=refresh_token,
        max_age=refresh_max_age,
        httponly=True,
        secure=SECURE_COOKIES,
        samesite="lax",
        path="/api/v1/auth/refresh",  # Only sent to refresh endpoint
    )
_generate_csrf_token function · python · L226-L235 (10 LOC)
api/auth/endpoints.py
def _generate_csrf_token(session_token: str) -> str:
    """Generate CSRF token from session token."""
    if not SESSION_SECRET_KEY:
        raise ValueError("SESSION_SECRET_KEY not configured")

    return hmac.new(
        SESSION_SECRET_KEY.encode(),
        session_token.encode(),
        "sha256",
    ).hexdigest()[:32]
_increment_failed_login function · python · L238-L276 (39 LOC)
api/auth/endpoints.py
async def _increment_failed_login(user_id: str) -> None:
    """Increment failed login count and potentially lock account."""
    now = datetime.now(timezone.utc)

    user = await database.fetch_one(
        users.select().where(users.c.id == user_id)
    )

    if not user:
        return

    new_count = (user["failed_login_attempts"] or 0) + 1

    if new_count >= LOGIN_LOCKOUT_THRESHOLD:
        # Lock account
        locked_until = now + timedelta(minutes=LOGIN_LOCKOUT_DURATION_MINUTES)
        await database.execute(
            users.update()
            .where(users.c.id == user_id)
            .values(
                failed_login_attempts=new_count,
                locked_until=locked_until,
            )
        )
        security_logger.warning(
            "Account locked due to failed logins",
            extra={
                "event": "account_locked",
                "user_id": user_id,
                "failed_attempts": new_count,
                "locked_until": loc
_reset_failed_login function · python · L279-L289 (11 LOC)
api/auth/endpoints.py
async def _reset_failed_login(user_id: str) -> None:
    """Reset failed login count after successful login."""
    await database.execute(
        users.update()
        .where(users.c.id == user_id)
        .values(
            failed_login_attempts=0,
            locked_until=None,
            last_login_at=datetime.now(timezone.utc),
        )
    )
Same scanner, your repo: https://repobility.com — Repobility
get_setup_status function · python · L298-L325 (28 LOC)
api/auth/endpoints.py
async def get_setup_status() -> SetupStatusResponse:
    """
    Check if initial setup is required.

    Returns needs_setup=True if no users exist in the system.
    This endpoint is always public.
    """
    try:
        user_count = await database.fetch_val(
            "SELECT COUNT(*) FROM users"
        )
    except Exception:
        logger.exception("Database error checking setup status")
        raise HTTPException(
            status_code=503,
            detail="Service temporarily unavailable. Please try again.",
        )

    if user_count == 0:
        return SetupStatusResponse(
            needs_setup=True,
            message="No users exist. Please create an admin account to get started.",
        )

    return SetupStatusResponse(
        needs_setup=False,
        message="Setup complete. Please log in.",
    )
create_initial_admin function · python · L329-L426 (98 LOC)
api/auth/endpoints.py
async def create_initial_admin(
    request: Request,
    response: Response,
    body: SetupRequest,
) -> SetupResponse:
    """
    Create the initial admin account.

    This endpoint only works when no users exist in the system.
    Once an admin is created, this endpoint returns 403.
    """
    # Validate password strength first (before any DB operations)
    is_valid, error = validate_password_strength(body.password)
    if not is_valid:
        raise HTTPException(status_code=400, detail=error)

    now = datetime.now(timezone.utc)
    user_id = str(uuid.uuid4())
    password_hash = hash_password(body.password)
    ip_address = _get_client_ip(request)
    user_agent = request.headers.get("user-agent")

    # Use transaction to prevent TOCTOU race condition:
    # - Check user count and insert atomically
    # - Unique constraints on username/email handle concurrent requests
    try:
        async with database.transaction():
            # Check if any users exist (within transa
login function · python · L430-L570 (141 LOC)
api/auth/endpoints.py
async def login(
    request: Request,
    response: Response,
    body: LoginRequest,
) -> LoginResponse:
    """
    Authenticate with username/email and password.

    Sets HTTP-only session cookies on success.
    """
    ip_address = _get_client_ip(request)
    user_agent = request.headers.get("user-agent")
    login_input = body.username_or_email.lower()
    # Hash identifier for logging (avoid PII in logs)
    identifier_hash = hashlib.sha256(login_input.encode()).hexdigest()[:16]

    # Find user by email or username (case-insensitive)
    try:
        user = await database.fetch_one(
            users.select().where(
                or_(
                    func.lower(users.c.email) == login_input,
                    func.lower(users.c.username) == login_input,
                )
            )
        )
    except Exception as e:
        logger.exception("Database error during login user lookup")
        raise HTTPException(
            status_code=503,
            detail="Ser
logout function · python · L574-L598 (25 LOC)
api/auth/endpoints.py
async def logout(
    request: Request,
    response: Response,
    user: dict = Depends(require_auth),
) -> dict:
    """
    Log out and invalidate current session.
    """
    session_id = user.get("session_id")
    if session_id:
        await invalidate_session(session_id)

    _clear_session_cookies(response)

    security_logger.info(
        "Logout",
        extra={
            "event": "logout",
            "user_id": user["id"],
            "session_id": session_id,
            "ip_address": _get_client_ip(request),
        },
    )

    return {"message": "Logged out successfully"}
refresh function · python · L602-L656 (55 LOC)
api/auth/endpoints.py
async def refresh(
    request: Request,
    response: Response,
    refresh_token: Optional[str] = Cookie(None, alias=REFRESH_COOKIE_NAME),
) -> LoginResponse:
    """
    Refresh session using refresh token.

    Implements token rotation - old tokens become invalid after use.
    """
    if not refresh_token:
        raise HTTPException(status_code=401, detail="No refresh token provided")

    ip_address = _get_client_ip(request)
    user_agent = request.headers.get("user-agent")

    try:
        new_session, new_refresh, expires_at, refresh_expires_at = await refresh_user_session(
            refresh_token=refresh_token,
            ip_address=ip_address,
            user_agent=user_agent,
        )
    except RefreshTokenReusedError:
        _clear_session_cookies(response)
        raise HTTPException(
            status_code=401,
            detail="Session invalidated for security. Please log in again.",
        )
    except (SessionExpiredError, SessionRevokedError):
        _
check_auth function · python · L660-L702 (43 LOC)
api/auth/endpoints.py
async def check_auth(
    request: Request,
    session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
) -> AuthCheckResponse:
    """
    Check if the current session is valid.

    Returns user info if authenticated, or authenticated=false if not.
    """
    if not session_token:
        return AuthCheckResponse(
            authenticated=False,
            oidc_enabled=OIDC_ENABLED,
            oidc_provider_name=OIDC_PROVIDER_NAME,
        )

    user = await validate_session_token(session_token, allow_grace_period=True)

    if not user:
        return AuthCheckResponse(
            authenticated=False,
            oidc_enabled=OIDC_ENABLED,
            oidc_provider_name=OIDC_PROVIDER_NAME,
        )

    # Get permissions for user's role
    role_permissions = get_role_permissions(user["role"])
    permissions = [p.value for p in role_permissions]

    return AuthCheckResponse(
        authenticated=True,
        oidc_enabled=OIDC_ENABLED,
        oidc_provider
get_current_user_info function · python · L706-L718 (13 LOC)
api/auth/endpoints.py
async def get_current_user_info(user: dict = Depends(require_auth)) -> dict:
    """Get current user profile."""
    return {
        "id": user["id"],
        "username": user["username"],
        "email": user["email"],
        "display_name": user["display_name"],
        "avatar_url": user["avatar_url"],
        "role": user["role"],
        "email_verified": user["email_verified"],
        "created_at": user["created_at"].isoformat() if user["created_at"] else None,
        "last_login_at": user["last_login_at"].isoformat() if user["last_login_at"] else None,
    }
update_profile function · python · L722-L753 (32 LOC)
api/auth/endpoints.py
async def update_profile(
    body: ProfileUpdateRequest,
    user: dict = Depends(require_auth),
) -> dict:
    """Update current user profile."""
    updates = {}

    if body.display_name is not None:
        updates["display_name"] = body.display_name.strip() if body.display_name else None

    if body.avatar_url is not None:
        updates["avatar_url"] = body.avatar_url.strip() if body.avatar_url else None

    if updates:
        updates["updated_at"] = datetime.now(timezone.utc)
        await database.execute(
            users.update().where(users.c.id == user["id"]).values(**updates)
        )

    # Return updated user
    updated_user = await database.fetch_one(
        users.select().where(users.c.id == user["id"])
    )

    return {
        "id": updated_user["id"],
        "username": updated_user["username"],
        "email": updated_user["email"],
        "display_name": updated_user["display_name"],
        "avatar_url": updated_user["avatar_url"],
        "role": u
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
change_password function · python · L757-L810 (54 LOC)
api/auth/endpoints.py
async def change_password(
    request: Request,
    body: PasswordChangeRequest,
    user: dict = Depends(require_auth),
) -> dict:
    """Change current user's password."""
    # Verify current password
    current_user = await database.fetch_one(
        users.select().where(users.c.id == user["id"])
    )

    if not current_user["password_hash"]:
        raise HTTPException(
            status_code=400,
            detail="Cannot change password for SSO-only account",
        )

    if not verify_password(body.current_password, current_user["password_hash"]):
        security_logger.warning(
            "Password change failed: invalid current password",
            extra={
                "event": "password_change_failed",
                "user_id": user["id"],
                "ip_address": _get_client_ip(request),
            },
        )
        raise HTTPException(status_code=401, detail="Current password is incorrect")

    # Validate new password
    is_valid, error = valida
forgot_password function · python · L814-L881 (68 LOC)
api/auth/endpoints.py
async def forgot_password(
    request: Request,
    body: ForgotPasswordRequest,
) -> dict:
    """
    Request password reset.

    Always returns success to prevent email enumeration.
    """
    # Check if password reset is enabled (requires email delivery to be configured)
    if not PASSWORD_RESET_ENABLED:
        raise HTTPException(
            status_code=503,
            detail="Password reset is not available. Please contact an administrator.",
        )

    ip_address = _get_client_ip(request)
    now = datetime.now(timezone.utc)

    # Always return success (constant-time response)
    success_response = {"message": "If an account exists with this email, a reset link has been sent"}

    # Find user
    user = await database.fetch_one(
        users.select().where(users.c.email == body.email.lower())
    )

    if not user:
        # Simulate work to prevent timing attacks
        hash_password("dummy")
        return success_response

    if not user["password_hash"]:
  
reset_password function · python · L885-L963 (79 LOC)
api/auth/endpoints.py
async def reset_password(
    request: Request,
    body: ResetPasswordRequest,
) -> dict:
    """Reset password using token."""
    # Check if password reset is enabled
    if not PASSWORD_RESET_ENABLED:
        raise HTTPException(
            status_code=503,
            detail="Password reset is not available. Please contact an administrator.",
        )

    ip_address = _get_client_ip(request)
    now = datetime.now(timezone.utc)

    # Validate new password
    is_valid, error = validate_password_strength(body.new_password)
    if not is_valid:
        raise HTTPException(status_code=400, detail=error)

    # Find token
    all_tokens = await database.fetch_all(
        password_reset_tokens.select()
        .where(password_reset_tokens.c.used_at.is_(None))
        .where(password_reset_tokens.c.expires_at > now)
    )

    from api.auth.password import verify_token

    valid_token = None
    for token_record in all_tokens:
        if verify_token(body.token, token_record["toke
list_sessions function · python · L967-L985 (19 LOC)
api/auth/endpoints.py
async def list_sessions(
    request: Request,
    user: dict = Depends(require_auth),
) -> list[dict]:
    """List active sessions for current user."""
    current_session_id = user.get("session_id")
    sessions = await get_user_sessions(user["id"])

    return [
        {
            "id": s["id"],
            "ip_address": s["ip_address"],
            "user_agent": s["user_agent"],
            "created_at": s["created_at"].isoformat() if s["created_at"] else None,
            "expires_at": s["expires_at"].isoformat() if s["expires_at"] else None,
            "is_current": s["id"] == current_session_id,
        }
        for s in sessions
    ]
revoke_session function · python · L989-L1024 (36 LOC)
api/auth/endpoints.py
async def revoke_session(
    session_id: str,
    request: Request,
    response: Response,
    user: dict = Depends(require_auth),
) -> dict:
    """Revoke a specific session."""
    # Verify session belongs to user
    from api.database import user_sessions

    session = await database.fetch_one(
        user_sessions.select()
        .where(user_sessions.c.id == session_id)
        .where(user_sessions.c.user_id == user["id"])
    )

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    await invalidate_session(session_id)

    # If revoking current session, clear cookies
    if session_id == user.get("session_id"):
        _clear_session_cookies(response)

    security_logger.info(
        "Session revoked",
        extra={
            "event": "session_revoked",
            "user_id": user["id"],
            "revoked_session_id": session_id,
            "ip_address": _get_client_ip(request),
        },
    )

    return {"message": "Se
get_csrf_token function · python · L1028-L1044 (17 LOC)
api/auth/endpoints.py
async def get_csrf_token(
    session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
) -> dict:
    """
    Get CSRF token for the current session.

    The CSRF token is derived from the session token using HMAC.
    """
    if not session_token:
        raise HTTPException(status_code=401, detail="No session")

    try:
        csrf_token = _generate_csrf_token(session_token)
    except ValueError as e:
        raise HTTPException(status_code=500, detail="Server configuration error")

    return {"csrf_token": csrf_token}
_validate_role function · python · L110-L117 (8 LOC)
api/auth/invite.py
def _validate_role(role: str) -> None:
    """Validate role value."""
    valid_roles = [r.value for r in Role]
    if role not in valid_roles:
        raise HTTPException(
            status_code=400,
            detail=f"Invalid role. Must be one of: {', '.join(valid_roles)}",
        )
_find_invite_by_token function · python · L120-L135 (16 LOC)
api/auth/invite.py
async def _find_invite_by_token(token: str) -> Optional[dict]:
    """Find an invite by token."""
    now = datetime.now(timezone.utc)

    # Get all pending invites
    invites = await database.fetch_all(
        user_invites.select()
        .where(user_invites.c.used_at.is_(None))
        .where(user_invites.c.expires_at > now)
    )

    for invite in invites:
        if verify_token(token, invite["token_hash"]):
            return dict(invite)

    return None
Repobility · severity-and-effort ranking · https://repobility.com
list_invites function · python · L144-L177 (34 LOC)
api/auth/invite.py
async def list_invites(
    pending_only: bool = Query(default=True),
    current_user: dict = Depends(require_permission(Permission.INVITE_READ)),
) -> InviteListResponse:
    """
    List all invites.

    Requires invite:read permission (admin only).
    """
    query = user_invites.select()

    if pending_only:
        now = datetime.now(timezone.utc)
        query = query.where(user_invites.c.used_at.is_(None))
        query = query.where(user_invites.c.expires_at > now)

    query = query.order_by(user_invites.c.created_at.desc())

    invites = await database.fetch_all(query)

    return InviteListResponse(
        invites=[
            InviteResponse(
                id=i["id"],
                email=i["email"],
                role=i["role"],
                expires_at=i["expires_at"],
                created_at=i["created_at"],
                used_at=i["used_at"],
            )
            for i in invites
        ],
        total=len(invites),
    )
create_invite function · python · L181-L262 (82 LOC)
api/auth/invite.py
async def create_invite(
    body: CreateInviteRequest,
    current_user: dict = Depends(require_permission(Permission.INVITE_CREATE)),
) -> CreateInviteResponse:
    """
    Create an invite for a new user.

    Requires invite:create permission (admin only).
    The token should be sent to the user via email.
    """
    _validate_role(body.role)

    # Check if email already has a user
    existing_user = await database.fetch_one(
        users.select().where(users.c.email == body.email.lower())
    )
    if existing_user:
        raise HTTPException(
            status_code=400,
            detail="A user with this email already exists",
        )

    # Check for pending invite
    now = datetime.now(timezone.utc)
    existing_invite = await database.fetch_one(
        user_invites.select()
        .where(user_invites.c.email == body.email.lower())
        .where(user_invites.c.used_at.is_(None))
        .where(user_invites.c.expires_at > now)
    )
    if existing_invite:
       
revoke_invite function · python · L266-L300 (35 LOC)
api/auth/invite.py
async def revoke_invite(
    invite_id: str,
    current_user: dict = Depends(require_permission(Permission.INVITE_DELETE)),
) -> dict:
    """
    Revoke an invite.

    Requires invite:delete permission (admin only).
    """
    invite = await database.fetch_one(
        user_invites.select().where(user_invites.c.id == invite_id)
    )

    if not invite:
        raise HTTPException(status_code=404, detail="Invite not found")

    if invite["used_at"]:
        raise HTTPException(status_code=400, detail="Invite has already been used")

    # Delete the invite
    await database.execute(
        user_invites.delete().where(user_invites.c.id == invite_id)
    )

    security_logger.info(
        "Invite revoked",
        extra={
            "event": "invite_revoked",
            "invite_id": invite_id,
            "email": invite["email"],
            "revoked_by": current_user["id"],
        },
    )

    return {"message": "Invite revoked"}
validate_invite function · python · L309-L325 (17 LOC)
api/auth/invite.py
async def validate_invite(token: str) -> ValidateInviteResponse:
    """
    Validate an invite token.

    Public endpoint - no authentication required.
    """
    invite = await _find_invite_by_token(token)

    if not invite:
        return ValidateInviteResponse(valid=False)

    return ValidateInviteResponse(
        valid=True,
        email=invite["email"],
        role=invite["role"],
        expires_at=invite["expires_at"],
    )
accept_invite function · python · L329-L426 (98 LOC)
api/auth/invite.py
async def accept_invite(
    token: str,
    body: AcceptInviteRequest,
) -> AcceptInviteResponse:
    """
    Accept an invite and create a user account.

    Public endpoint - no authentication required.
    """
    # Check registration mode
    if REGISTRATION_MODE == "disabled":
        raise HTTPException(
            status_code=403,
            detail="Registration is currently disabled",
        )

    # Find and validate invite
    invite = await _find_invite_by_token(token)

    if not invite:
        raise HTTPException(
            status_code=400,
            detail="Invalid or expired invite token",
        )

    # Validate username uniqueness
    existing = await database.fetch_one(
        users.select().where(users.c.username == body.username.lower())
    )
    if existing:
        raise HTTPException(status_code=400, detail="Username already exists")

    # Check if email already has a user (race condition check)
    existing = await database.fetch_one(
        users
_get_client_ip function · python · L35-L43 (9 LOC)
api/auth/middleware.py
def _get_client_ip(request: Request) -> str:
    """Extract client IP, respecting trusted proxies."""
    direct_ip = request.client.host if request.client else "unknown"

    forwarded_for = request.headers.get("x-forwarded-for")
    if forwarded_for and direct_ip in TRUSTED_PROXIES:
        return forwarded_for.split(",")[0].strip()

    return direct_ip
get_current_user function · python · L46-L77 (32 LOC)
api/auth/middleware.py
async def get_current_user(
    request: Request,
    session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
    api_key: Optional[str] = Security(api_key_header),
) -> Optional[dict]:
    """
    Get the current authenticated user from session cookie or API key.

    This is a non-enforcing dependency - returns None if not authenticated.
    Use require_auth() for endpoints that require authentication.

    Args:
        request: The FastAPI request
        session_token: Session token from cookie
        api_key: API key from header

    Returns:
        User record as dict, or None if not authenticated
    """
    # Try session token first
    if session_token:
        user = await validate_session_token(session_token)
        if user:
            return user

    # Try API key
    if api_key:
        user = await _authenticate_api_key(api_key, request)
        if user:
            return user

    return None
require_auth function · python · L80-L108 (29 LOC)
api/auth/middleware.py
async def require_auth(
    request: Request,
    session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
    api_key: Optional[str] = Security(api_key_header),
) -> dict:
    """
    Require authentication - raises 401 if not authenticated.

    Args:
        request: The FastAPI request
        session_token: Session token from cookie
        api_key: API key from header

    Returns:
        User record as dict

    Raises:
        HTTPException: 401 if not authenticated
    """
    user = await get_current_user(request, session_token, api_key)

    if not user:
        raise HTTPException(
            status_code=401,
            detail="Authentication required",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user
If a scraper extracted this row, it came from Repobility (https://repobility.com)
require_permission function · python · L111-L154 (44 LOC)
api/auth/middleware.py
def require_permission(permission: Permission) -> Callable:
    """
    Create a dependency that requires a specific permission.

    Usage:
        @router.post("/videos")
        async def create_video(user: dict = Depends(require_permission(Permission.VIDEO_CREATE))):
            ...

    Args:
        permission: The required permission

    Returns:
        A FastAPI dependency function
    """

    async def permission_checker(
        request: Request,
        session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
        api_key: Optional[str] = Security(api_key_header),
    ) -> dict:
        user = await require_auth(request, session_token, api_key)

        role = Role(user["role"])
        if not has_permission(role, permission):
            security_logger.warning(
                "Permission denied",
                extra={
                    "event": "permission_denied",
                    "user_id": user["id"],
                    "username": user["us
require_role function · python · L157-L201 (45 LOC)
api/auth/middleware.py
def require_role(role: Role) -> Callable:
    """
    Create a dependency that requires a specific role or higher.

    Role hierarchy: admin > editor > viewer

    Args:
        role: The minimum required role

    Returns:
        A FastAPI dependency function
    """
    role_hierarchy = {Role.VIEWER: 0, Role.EDITOR: 1, Role.ADMIN: 2}

    async def role_checker(
        request: Request,
        session_token: Optional[str] = Cookie(None, alias=SESSION_COOKIE_NAME),
        api_key: Optional[str] = Security(api_key_header),
    ) -> dict:
        user = await require_auth(request, session_token, api_key)

        user_role = Role(user["role"])
        required_level = role_hierarchy.get(role, 999)
        user_level = role_hierarchy.get(user_role, -1)

        if user_level < required_level:
            security_logger.warning(
                "Insufficient role",
                extra={
                    "event": "role_denied",
                    "user_id": user["id"],
        
require_ownership_or_permission function · python · L204-L239 (36 LOC)
api/auth/middleware.py
async def require_ownership_or_permission(
    resource_owner_id: Optional[str],
    permission: Permission,
    any_permission: Permission,
    user: dict,
) -> bool:
    """
    Check if user owns resource or has permission to access any.

    Args:
        resource_owner_id: The owner ID of the resource
        permission: The base permission (e.g., VIDEO_UPDATE)
        any_permission: The "any" permission (e.g., VIDEO_UPDATE_ANY)
        user: The current user

    Returns:
        True if authorized

    Raises:
        HTTPException: 403 if not authorized
    """
    role = Role(user["role"])

    # Check for "any" permission (admin level)
    if has_permission(role, any_permission):
        return True

    # Check ownership + base permission
    if has_permission(role, permission):
        if resource_owner_id is None or resource_owner_id == user["id"]:
            return True

    raise HTTPException(
        status_code=403,
        detail="You don't have permission to acces
_authenticate_api_key function · python · L242-L340 (99 LOC)
api/auth/middleware.py
async def _authenticate_api_key(api_key: str, request: Request) -> Optional[dict]:
    """
    Authenticate a user API key.

    Args:
        api_key: The API key to authenticate
        request: The request for logging context

    Returns:
        User record if valid, None otherwise
    """
    if not api_key or len(api_key) < 8:
        return None

    ip_address = _get_client_ip(request)
    prefix = get_token_prefix(api_key)
    now = datetime.now(timezone.utc)

    # Find keys with matching prefix
    key_records = await database.fetch_all(
        user_api_keys.select()
        .where(user_api_keys.c.key_prefix == prefix)
        .where(user_api_keys.c.revoked_at.is_(None))
    )

    for key_record in key_records:
        key_hash = key_record["key_hash"]
        # Support both SHA-256 (new) and argon2id (legacy) hashes
        if is_sha256_hash(key_hash):
            if not verify_token_fast(api_key, key_hash):
                continue
        else:
            # Legacy arg
_update_api_key_last_used function · python · L343-L352 (10 LOC)
api/auth/middleware.py
async def _update_api_key_last_used(key_id: str) -> None:
    """Update API key last_used_at in background."""
    try:
        await database.execute(
            user_api_keys.update()
            .where(user_api_keys.c.id == key_id)
            .values(last_used_at=datetime.now(timezone.utc))
        )
    except Exception:
        pass  # Non-critical
_check_circuit_breaker function · python · L73-L94 (22 LOC)
api/auth/oidc.py
async def _check_circuit_breaker() -> None:
    """Check if circuit breaker is open (thread-safe)."""
    async with _circuit_breaker_lock:
        if not _circuit_breaker.is_open:
            return

        # Check if recovery period has passed
        if _circuit_breaker.last_failure_time:
            recovery_time = _circuit_breaker.last_failure_time + timedelta(
                seconds=CIRCUIT_BREAKER_RECOVERY_SECONDS
            )
            if datetime.now(timezone.utc) > recovery_time:
                # Reset circuit breaker (half-open state)
                _circuit_breaker.is_open = False
                _circuit_breaker.failure_count = 0
                logger.info("OIDC circuit breaker reset (recovery period passed)")
                return

        raise HTTPException(
            status_code=503,
            detail=f"OIDC provider temporarily unavailable. Please try again in {CIRCUIT_BREAKER_RECOVERY_SECONDS} seconds.",
        )
_record_failure function · python · L97-L107 (11 LOC)
api/auth/oidc.py
async def _record_failure() -> None:
    """Record a failure and potentially open circuit breaker (thread-safe)."""
    async with _circuit_breaker_lock:
        _circuit_breaker.failure_count += 1
        _circuit_breaker.last_failure_time = datetime.now(timezone.utc)

        if _circuit_breaker.failure_count >= CIRCUIT_BREAKER_THRESHOLD:
            _circuit_breaker.is_open = True
            logger.warning(
                f"OIDC circuit breaker opened after {_circuit_breaker.failure_count} failures"
            )
_get_oidc_config function · python · L137-L177 (41 LOC)
api/auth/oidc.py
async def _get_oidc_config() -> OIDCConfig:
    """Get OIDC configuration from discovery endpoint (cached)."""
    global _oidc_config_cache

    if _oidc_config_cache:
        cache_age = (datetime.now(timezone.utc) - _oidc_config_cache.cached_at).total_seconds()
        if cache_age < OIDC_CONFIG_CACHE_TTL_SECONDS:
            return _oidc_config_cache

    if not OIDC_DISCOVERY_URL:
        raise HTTPException(status_code=500, detail="OIDC not configured")

    await _check_circuit_breaker()

    try:
        async with httpx.AsyncClient(timeout=OIDC_TIMEOUT_SECONDS) as client:
            response = await client.get(OIDC_DISCOVERY_URL)
            response.raise_for_status()
            config = response.json()
    except httpx.TimeoutException:
        await _record_failure()
        raise HTTPException(status_code=503, detail="OIDC provider timeout")
    except httpx.ConnectError:
        await _record_failure()
        raise HTTPException(status_code=503, detail="Cannot connect 
Same scanner, your repo: https://repobility.com — Repobility
_set_session_cookies function · python · L206-L236 (31 LOC)
api/auth/oidc.py
def _set_session_cookies(
    response: Response,
    session_token: str,
    refresh_token: str,
    expires_at: datetime,
    refresh_expires_at: datetime,
) -> None:
    """Set session and refresh token cookies."""
    now = datetime.now(timezone.utc)
    session_max_age = int((expires_at - now).total_seconds())
    refresh_max_age = int((refresh_expires_at - now).total_seconds())

    response.set_cookie(
        key=SESSION_COOKIE_NAME,
        value=session_token,
        max_age=session_max_age,
        httponly=True,
        secure=SECURE_COOKIES,
        samesite="lax",
        path="/",
    )

    response.set_cookie(
        key=REFRESH_COOKIE_NAME,
        value=refresh_token,
        max_age=refresh_max_age,
        httponly=True,
        secure=SECURE_COOKIES,
        samesite="lax",
        path="/api/v1/auth/refresh",
    )
get_oidc_status function · python · L245-L254 (10 LOC)
api/auth/oidc.py
async def get_oidc_status() -> OIDCStatusResponse:
    """
    Get OIDC configuration status.

    Returns whether OIDC is enabled and the provider name.
    """
    return OIDCStatusResponse(
        enabled=OIDC_ENABLED,
        provider_name=OIDC_PROVIDER_NAME if OIDC_ENABLED else "",
    )
initiate_oidc_authorize function · python · L258-L312 (55 LOC)
api/auth/oidc.py
async def initiate_oidc_authorize(
    request: Request,
    redirect_uri: str = Query(..., description="Where to redirect after auth"),
) -> OIDCAuthorizeResponse:
    """
    Initiate OIDC authorization flow.

    Generates state and nonce for security, stores them, and returns
    the URL to redirect the user to the OIDC provider.
    """
    if not OIDC_ENABLED:
        raise HTTPException(status_code=400, detail="OIDC is not enabled")

    config = await _get_oidc_config()

    # Generate cryptographic state and nonce
    state = secrets.token_urlsafe(32)
    nonce = secrets.token_urlsafe(32)

    now = datetime.now(timezone.utc)
    expires_at = now + timedelta(minutes=OIDC_STATE_EXPIRY_MINUTES)

    # Store state in database
    state_id = str(uuid.uuid4())
    await database.execute(
        oidc_states.insert().values(
            id=state_id,
            state=state,
            nonce=nonce,
            redirect_uri=redirect_uri,
            expires_at=expires_at,
           
‹ prevpage 5 / 20next ›