← back to filthyrake__vlog

Function bodies 1,000 total

All specs Real LLM only Function bodies
RequestIDMiddleware.dispatch method · python · L292-L327 (36 LOC)
api/common.py
    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        # Defensive: clear any leaked context from previous request
        clear_request_context()

        # Use existing request ID from header, or generate a new one
        request_id = request.headers.get("X-Request-ID")
        if request_id:
            # Sanitize: limit length and allow only safe characters
            # This prevents log injection attacks and excessive memory usage
            request_id = request_id[:REQUEST_ID_MAX_LENGTH].strip()
            if not REQUEST_ID_PATTERN.match(request_id):
                request_id = None
        if not request_id:
            request_id = str(uuid.uuid4())

        # Store in request state for access by handlers
        request.state.request_id = request_id

        # Set logging context for structured logs (Issue #208)
        # User-Agent is sanitized to prevent log injection attacks
        set_request_context(
            request_id=request_i
get_request_id function · python · L330-L336 (7 LOC)
api/common.py
def get_request_id(request: Request) -> Optional[str]:
    """
    Get the request ID from the request state.

    Returns None if RequestIDMiddleware hasn't processed the request yet.
    """
    return getattr(request.state, "request_id", None)
SecurityHeadersMiddleware.dispatch method · python · L342-L361 (20 LOC)
api/common.py
    async def dispatch(self, request: Request, call_next: Callable) -> Response:
        response = await call_next(request)
        # Prevent clickjacking (skip for embed pages - they use CSP frame-ancestors)
        if not request.url.path.startswith("/embed/"):
            response.headers["X-Frame-Options"] = "SAMEORIGIN"
        # Prevent MIME-type sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"
        # XSS protection for legacy browsers
        response.headers["X-XSS-Protection"] = "1; mode=block"
        # Control referrer information
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        # Permissions policy (disable unnecessary browser features)
        response.headers["Permissions-Policy"] = "geolocation=(), microphone=(), camera=()"
        # Content Security Policy - restrict resource loading
        # Skip CSP for HTML pages (they have their own CSP meta tag with Alpine.js support)
        content_type = response
HTTPMetricsMiddleware.__init__ method · python · L381-L390 (10 LOC)
api/common.py
    def __init__(self, app: "ASGIApp", api_name: str):
        """
        Initialize the middleware.

        Args:
            app: The ASGI application to wrap
            api_name: Name of the API for labeling ("admin", "worker", "public")
        """
        self.app = app
        self.api_name = api_name
HTTPMetricsMiddleware.__call__ method · python · L392-L438 (47 LOC)
api/common.py
    async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
        """Process an ASGI request."""
        # Only process HTTP requests
        if scope["type"] != "http":
            await self.app(scope, receive, send)
            return

        # Import here to avoid circular imports
        from api.metrics import (
            HTTP_REQUEST_DURATION_SECONDS,
            HTTP_REQUESTS_IN_PROGRESS,
            HTTP_REQUESTS_TOTAL,
            normalize_endpoint,
        )

        # Increment in-progress gauge
        HTTP_REQUESTS_IN_PROGRESS.labels(api=self.api_name).inc()
        start_time = time.perf_counter()
        status_code = 500  # Default if exception occurs before response

        async def send_wrapper(message: dict) -> None:
            """Capture status code from response."""
            nonlocal status_code
            if message["type"] == "http.response.start":
                status_code = message["status"]
            await send(mes
rate_limit_exceeded_handler function · python · L441-L449 (9 LOC)
api/common.py
def rate_limit_exceeded_handler(request: Request, exc: RateLimitExceeded) -> JSONResponse:
    """Handle rate limit exceeded errors with a proper JSON response."""
    return JSONResponse(
        status_code=429,
        content={
            "detail": "Rate limit exceeded",
            "error": str(exc.detail),
        },
    )
_check_storage_sync function · python · L452-L478 (27 LOC)
api/common.py
def _check_storage_sync() -> bool:
    """
    Synchronous storage check that verifies both existence and writability.

    This runs in a thread pool to avoid blocking the event loop, and includes
    a write test to detect read-only mounts, permission issues, or full disks.
    """
    import os

    # Skip storage check in test mode (CI doesn't have real storage)
    if os.environ.get("VLOG_TEST_MODE"):
        return True

    try:
        # Check directories exist
        if not VIDEOS_DIR.exists() or not UPLOADS_DIR.exists():
            return False

        # Test write capability by creating and removing a temp file
        # Use uploads dir since that's where new files arrive
        test_file = UPLOADS_DIR / f".health_check_{uuid.uuid4().hex}"
        test_file.write_text("health check")
        test_file.unlink()

        return True
    except (IOError, OSError, PermissionError):
        return False
Repobility — same analyzer, your code, free for public repos · /scan/
check_health function · python · L481-L532 (52 LOC)
api/common.py
async def check_health() -> dict:
    """
    Perform health checks for database and storage.

    Returns a dict with:
        - checks: dict of individual check results
        - healthy: bool indicating overall health
        - status_code: HTTP status code (200 if healthy, 503 if not)
    """
    checks = {
        "database": False,
        "storage": False,
    }

    # Check database connectivity
    try:
        await database.fetch_one("SELECT 1")
        checks["database"] = True
    except Exception as e:
        logger.warning(f"Database health check failed: {e}")

    # Check storage accessibility (NAS mount) with timeout
    # Uses a timeout to detect stale NFS mounts that would otherwise hang
    try:
        loop = asyncio.get_running_loop()
        checks["storage"] = await asyncio.wait_for(
            loop.run_in_executor(None, _check_storage_sync),
            timeout=STORAGE_CHECK_TIMEOUT,
        )
    except asyncio.TimeoutError:
        # Storage check timed out
check_storage_available function · python · L535-L583 (49 LOC)
api/common.py
async def check_storage_available() -> bool:
    """
    Check if storage is currently available, using cached status when recent.

    This is a fast check suitable for use in request handling. It uses a cached
    status within the TTL to avoid hammering the storage on every request.

    Returns:
        True if storage is available, False otherwise.
    """
    import os

    # Skip storage check in test mode (CI doesn't have real storage)
    if os.environ.get("VLOG_TEST_MODE"):
        return True

    now = datetime.now(timezone.utc)

    # Use lock to prevent race conditions with concurrent access
    async with _get_storage_health_lock():
        # Return cached status if recent
        if _storage_health_cache["last_check"] is not None:
            age = (now - _storage_health_cache["last_check"]).total_seconds()
            if age < STORAGE_HEALTH_CACHE_TTL:
                return _storage_health_cache["healthy"]

        # Perform a quick storage check (outside lock would a
require_storage_available function · python · L586-L603 (18 LOC)
api/common.py
async def require_storage_available():
    """
    FastAPI dependency that ensures storage is available.

    Use this as a dependency for endpoints that require storage access.
    Raises HTTPException 503 if storage is unavailable.

    Example:
        @app.get("/videos/{slug}/stream")
        async def stream_video(slug: str, _=Depends(require_storage_available)):
            ...
    """
    if not await check_storage_available():
        raise HTTPException(
            status_code=503,
            detail="Video storage temporarily unavailable. Please try again later.",
            headers={"Retry-After": "30"},
        )
get_storage_status function · python · L606-L623 (18 LOC)
api/common.py
def get_storage_status() -> dict:
    """
    Get the current storage health status from cache.

    Returns a dict with:
        - healthy: bool indicating storage health
        - last_check: ISO timestamp of last check (or None)
        - last_error: Error message if unhealthy (or None)
    """
    return {
        "healthy": _storage_health_cache["healthy"],
        "last_check": (
            _storage_health_cache["last_check"].isoformat()
            if _storage_health_cache["last_check"]
            else None
        ),
        "last_error": _storage_health_cache["last_error"],
    }
configure_database function · python · L18-L24 (7 LOC)
api/database.py
async def configure_database():
    """
    Configure database-specific settings after connection.
    For PostgreSQL, this is a no-op since FK constraints are always enforced.
    """
    # PostgreSQL enforces foreign keys by default - no configuration needed
    pass
create_tables function · python · L2014-L2021 (8 LOC)
api/database.py
def create_tables():
    """
    Create database tables directly using SQLAlchemy metadata.
    This creates all tables if they don't exist.
    """
    engine = sa.create_engine(DATABASE_URL)
    metadata.create_all(engine)
    engine.dispose()
is_retryable_database_error function · python · L57-L107 (51 LOC)
api/db_retry.py
def is_retryable_database_error(exc: Exception) -> bool:
    """
    Check if an exception is a retryable database error.

    Supports both SQLite and PostgreSQL error patterns.
    """
    error_str = str(exc).lower()

    # SQLite error patterns
    sqlite_patterns = [
        "database is locked",
        "database table is locked",
        "sqlite_busy",
        "sqlite_locked",
    ]

    # PostgreSQL error patterns
    postgres_patterns = [
        "deadlock detected",  # 40P01
        "could not serialize access",  # 40001 serialization failure
        "could not obtain lock",  # Lock contention
        "connection refused",  # Transient connection error
        "connection reset",  # Connection dropped
        "server closed the connection unexpectedly",
        "canceling statement due to lock timeout",
        "lock timeout",
    ]

    # Check for SQLite errors
    for pattern in sqlite_patterns:
        if pattern in error_str:
            return True

    # Check for Post
is_deadlock_error function · python · L114-L137 (24 LOC)
api/db_retry.py
def is_deadlock_error(exc: Exception) -> bool:
    """
    Check if an exception is specifically a deadlock error.

    Deadlocks require special handling because chronic deadlocks indicate
    a deeper problem that retries won't solve. (Issue #460)
    """
    error_str = str(exc).lower()

    # PostgreSQL deadlock pattern
    if "deadlock detected" in error_str:
        return True

    # Check for PostgreSQL error code 40P01 (deadlock_detected)
    if hasattr(exc, "sqlstate"):
        sqlstate = getattr(exc, "sqlstate", "")
        if sqlstate == "40P01":
            return True

    # Check wrapped exceptions
    if hasattr(exc, "__cause__") and exc.__cause__ is not None:
        return is_deadlock_error(exc.__cause__)

    return False
Repobility analyzer · published findings · https://repobility.com
execute_with_retry function · python · L152-L243 (92 LOC)
api/db_retry.py
async def execute_with_retry(
    func: Callable,
    *args,
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
    total_deadline: Optional[float] = DEFAULT_TOTAL_DEADLINE,
    **kwargs,
) -> T:
    """
    Execute an async function with retry logic for transient database errors.

    Uses exponential backoff with jitter to reduce contention.
    Deadlocks have a separate, lower retry limit to detect chronic issues early.

    Args:
        func: Async function to execute
        *args: Positional arguments for func
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay between retries (seconds)
        max_delay: Maximum delay between retries (seconds)
        total_deadline: Maximum total time for all attempts (seconds). None for no limit.
        **kwargs: Keyword arguments for func

    Returns:
        Result of the function

    Raises:
        DeadlockError: If too ma
with_db_retry function · python · L246-L278 (33 LOC)
api/db_retry.py
def with_db_retry(
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
):
    """
    Decorator to add database retry logic to async functions.

    Usage:
        @with_db_retry()
        async def my_database_operation():
            ...

        @with_db_retry(max_retries=10)
        async def critical_operation():
            ...
    """

    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            return await execute_with_retry(
                func,
                *args,
                max_retries=max_retries,
                base_delay=base_delay,
                max_delay=max_delay,
                **kwargs,
            )

        return wrapper

    return decorator
fetch_one_with_retry function · python · L286-L335 (50 LOC)
api/db_retry.py
async def fetch_one_with_retry(
    query,
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
    timeout: float = DEFAULT_QUERY_TIMEOUT,
):
    """
    Execute a fetch_one query with retry logic for transient database errors.

    Args:
        query: SQLAlchemy query to execute
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay between retries (seconds)
        max_delay: Maximum delay between retries (seconds)
        timeout: Query timeout in seconds (Issue #549)

    Returns:
        The query result (single row or None)

    Raises:
        DatabaseTimeoutError: If query times out
        DatabaseRetryableError: If all retries are exhausted
    """
    from api.database import database

    async def _fetch():
        start_time = time.monotonic()
        try:
            result = await asyncio.wait_for(
                database.fetch_one(query),
                tim
fetch_all_with_retry function · python · L338-L387 (50 LOC)
api/db_retry.py
async def fetch_all_with_retry(
    query,
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
    timeout: float = DEFAULT_QUERY_TIMEOUT,
):
    """
    Execute a fetch_all query with retry logic for transient database errors.

    Args:
        query: SQLAlchemy query to execute
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay between retries (seconds)
        max_delay: Maximum delay between retries (seconds)
        timeout: Query timeout in seconds (Issue #549)

    Returns:
        The query result (list of rows)

    Raises:
        DatabaseTimeoutError: If query times out
        DatabaseRetryableError: If all retries are exhausted
    """
    from api.database import database

    async def _fetch():
        start_time = time.monotonic()
        try:
            result = await asyncio.wait_for(
                database.fetch_all(query),
                timeout=t
fetch_val_with_retry function · python · L390-L439 (50 LOC)
api/db_retry.py
async def fetch_val_with_retry(
    query,
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
    timeout: float = DEFAULT_QUERY_TIMEOUT,
):
    """
    Execute a fetch_val query with retry logic for transient database errors.

    Args:
        query: SQLAlchemy query to execute
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay between retries (seconds)
        max_delay: Maximum delay between retries (seconds)
        timeout: Query timeout in seconds (Issue #549)

    Returns:
        The query result (single scalar value or None)

    Raises:
        DatabaseTimeoutError: If query times out
        DatabaseRetryableError: If all retries are exhausted
    """
    from api.database import database

    async def _fetch():
        start_time = time.monotonic()
        try:
            result = await asyncio.wait_for(
                database.fetch_val(query),
          
db_execute_with_retry function · python · L442-L511 (70 LOC)
api/db_retry.py
async def db_execute_with_retry(
    query,
    values=None,
    max_retries: int = DEFAULT_MAX_RETRIES,
    base_delay: float = DEFAULT_BASE_DELAY,
    max_delay: float = DEFAULT_MAX_DELAY,
    timeout: float = DEFAULT_QUERY_TIMEOUT,
):
    """
    Execute a database write query with retry logic for transient database errors.

    IMPORTANT: Timeouts are NOT retried for write operations because the write
    may have committed on the database side. Retrying could cause duplicates
    or data corruption. (Issue #549 review feedback)

    Args:
        query: SQLAlchemy query to execute
        values: Optional values dict for the query
        max_retries: Maximum number of retry attempts
        base_delay: Initial delay between retries (seconds)
        max_delay: Maximum delay between retries (seconds)
        timeout: Query timeout in seconds (Issue #549)

    Returns:
        The query result (typically row ID for inserts)

    Raises:
        DatabaseTimeoutError: If query times 
truncate_string function · python · L53-L74 (22 LOC)
api/errors.py
def truncate_string(text: Optional[str], max_length: int) -> Optional[str]:
    """
    Truncate a string to a maximum length.

    Generic string truncation utility that can be used for any text,
    not just error messages.

    Args:
        text: The text to truncate (can be None)
        max_length: Maximum length (must be at least 4 for truncation with ellipsis)

    Returns:
        The truncated text with "..." appended if it was truncated, or None if input was None
    """
    if text is None:
        return None
    if not text or len(text) <= max_length:
        return text
    # Ensure we have enough space for ellipsis (...)
    if max_length < 4:
        return text[:max_length]
    return text[:max_length - 3] + "..."
truncate_error function · python · L77-L88 (12 LOC)
api/errors.py
def truncate_error(msg: Optional[str], max_length: int = ERROR_DETAIL_MAX_LENGTH) -> Optional[str]:
    """
    Truncate an error message to a maximum length.

    Args:
        msg: The error message to truncate (can be None)
        max_length: Maximum length (default: ERROR_DETAIL_MAX_LENGTH)

    Returns:
        The truncated message with "..." appended if it was truncated, or None if input was None
    """
    return truncate_string(msg, max_length)
Repobility · code-quality intelligence platform · https://repobility.com
sanitize_error_message function · python · L92-L174 (83 LOC)
api/errors.py
def sanitize_error_message(
    error: Optional[str],
    logging_mode: Union[ErrorLogging, bool] = ErrorLogging.LOG_ORIGINAL,
    context: str = "",
) -> Optional[str]:
    """
    Sanitize an error message for safe display to API clients.

    Args:
        error: The original error message (may contain internal details)
        logging_mode: Whether to log the original message before sanitizing.
            Use ErrorLogging.LOG_ORIGINAL or ErrorLogging.SKIP_LOGGING.
            Boolean values are deprecated but supported for backwards compatibility.
        context: Additional context for logging (e.g., "video_id=123")

    Returns:
        A sanitized, user-friendly error message, or None if input was None
    """
    if error is None:
        return None

    # Handle backwards compatibility with boolean values
    if isinstance(logging_mode, bool):
        warnings.warn(
            "Passing boolean to sanitize_error_message() is deprecated. "
            "Use ErrorLogging.LOG_OR
is_unique_violation function · python · L177-L208 (32 LOC)
api/errors.py
def is_unique_violation(exc: Exception, column: Optional[str] = None) -> bool:
    """
    Check if an exception is a unique constraint violation.

    Supports both SQLite and PostgreSQL error formats:
    - SQLite: "UNIQUE constraint failed: table.column"
    - PostgreSQL: "duplicate key value violates unique constraint"

    Args:
        exc: The exception to check
        column: Optional column name to check for specific constraint

    Returns:
        True if this is a unique constraint violation (optionally on the specified column)
    """
    error_str = str(exc).lower()

    # Check for SQLite-style errors
    is_sqlite_unique = "unique constraint failed" in error_str

    # Check for PostgreSQL-style errors
    is_postgres_unique = "duplicate key value violates unique constraint" in error_str
    is_postgres_unique = is_postgres_unique or "uniqueviolation" in error_str

    if not (is_sqlite_unique or is_postgres_unique):
        return False

    # If a column name is spec
sanitize_progress_error function · python · L211-L241 (31 LOC)
api/errors.py
def sanitize_progress_error(error: Optional[str]) -> Optional[str]:
    """
    Sanitize error messages specifically for transcoding progress responses.
    These are shown in the admin UI during video processing.

    Args:
        error: The original error from transcoding job

    Returns:
        A sanitized error message suitable for display
    """
    if error is None:
        return None

    # For progress errors, we can be slightly more specific
    error_lower = error.lower()

    if "timeout" in error_lower:
        return "Processing timed out"

    if "all" in error_lower and "failed" in error_lower:
        return "All quality variants failed to process"

    if "retry" in error_lower or "attempt" in error_lower:
        return "Processing failed, retrying..."

    if "max" in error_lower and "exceeded" in error_lower:
        return "Maximum retry attempts exceeded"

    # Fall back to general sanitization
    return sanitize_error_message(error, ErrorLogging.SKIP_LOGGI
handle_api_exceptions function · python · L19-L60 (42 LOC)
api/exception_utils.py
def handle_api_exceptions(
    operation_name: str,
    error_detail: str = "Internal server error",
    status_code: int = 500,
    log_errors: bool = True,
) -> Callable[[Callable[..., T]], Callable[..., T]]:
    """
    Decorator for standardized exception handling in API endpoints.

    This ensures:
    1. HTTPExceptions are always re-raised (never masked)
    2. Specific domain errors can be caught and converted to appropriate HTTP errors
    3. Generic exceptions are logged and converted to 500 errors with sanitized messages

    Args:
        operation_name: Name of the operation for logging context
        error_detail: Default error message for generic exceptions
        status_code: Default status code for generic exceptions
        log_errors: Whether to log exceptions (default: True)

    Example:
        @handle_api_exceptions("video_upload", "Failed to upload video", 500)
        async def upload_video(...):
            # Your code here
            pass
    """
    def d
log_and_raise_http_exception function · python · L63-L93 (31 LOC)
api/exception_utils.py
def log_and_raise_http_exception(
    exception: Exception,
    status_code: int,
    detail: str,
    operation_name: Optional[str] = None,
    log_level: str = "error",
) -> None:
    """
    Log an exception and raise an HTTPException with sanitized message.

    Args:
        exception: The original exception
        status_code: HTTP status code for the response
        detail: User-facing error message (should be sanitized)
        operation_name: Optional operation name for logging context
        log_level: Logging level (default: "error")

    Example:
        try:
            result = await database_operation()
        except DatabaseError as e:
            log_and_raise_http_exception(
                e, 500, "Database error occurred", "save_video"
            )
    """
    log_msg = f"Error in {operation_name}: {exception}" if operation_name else str(exception)

    log_func = getattr(logger, log_level, logger.error)
    log_func(log_msg)

    raise HTTPException(status_cod
JobDispatch.to_stream_dict method · python · L69-L81 (13 LOC)
api/job_queue.py
    def to_stream_dict(self) -> dict:
        """Convert to Redis stream message format (all string values)."""
        return {
            "job_id": str(self.job_id),
            "video_id": str(self.video_id),
            "video_slug": self.video_slug,
            "source_filename": self.source_filename or "",
            "source_width": str(self.source_width or 0),
            "source_height": str(self.source_height or 0),
            "duration": str(self.duration or 0),
            "priority": self.priority,
            "created_at": (self.created_at or datetime.now(timezone.utc)).isoformat(),
        }
JobDispatch.from_stream_dict method · python · L84-L146 (63 LOC)
api/job_queue.py
    def from_stream_dict(cls, data: dict, message_id: str = None, stream_name: str = None) -> "JobDispatch":
        """Create from Redis stream message.

        Args:
            data: Message data dictionary from Redis stream
            message_id: Redis message ID for acknowledgment
            stream_name: Source stream name for acknowledgment

        Returns:
            JobDispatch instance

        Raises:
            ValueError: If required fields are missing or have invalid format
        """
        # Validate required fields exist
        if "job_id" not in data or "video_id" not in data or "video_slug" not in data:
            missing = [f for f in ("job_id", "video_id", "video_slug") if f not in data]
            raise ValueError(f"Missing required fields in job message: {missing}")

        try:
            job_id = int(data["job_id"])
            video_id = int(data["video_id"])
        except (ValueError, TypeError) as e:
            raise ValueError(f"Invalid job_id
JobQueue.initialize method · python · L163-L201 (39 LOC)
api/job_queue.py
    async def initialize(self, consumer_name: str) -> None:
        """
        Initialize the job queue for a worker.

        Args:
            consumer_name: Unique name for this consumer (e.g., worker-abc123)
        """
        self._consumer_name = consumer_name

        if JOB_QUEUE_MODE not in ("redis", "hybrid"):
            logger.info("Job queue mode: database (polling)")
            return

        redis = await get_redis()
        if not redis:
            if JOB_QUEUE_MODE == "redis":
                logger.warning("Redis required but unavailable, jobs will not be claimed")
            else:
                logger.info("Redis unavailable, using database polling fallback")
            return

        try:
            # Create consumer groups for all priority streams
            for priority, stream_name in PRIORITY_STREAMS.items():
                try:
                    await redis.xgroup_create(stream_name, REDIS_CONSUMER_GROUP, id="0", mkstream=True)
                  
Source: Repobility analyzer · https://repobility.com
JobQueue.publish_job method · python · L208-L236 (29 LOC)
api/job_queue.py
    async def publish_job(self, job: JobDispatch) -> bool:
        """
        Publish a new job to the queue.

        Args:
            job: Job dispatch information

        Returns:
            True if published to Redis, False if database-only
        """
        if JOB_QUEUE_MODE == "database":
            return False

        redis = await get_redis()
        if not redis:
            return False

        try:
            stream_name = PRIORITY_STREAMS.get(job.priority, PRIORITY_STREAMS["normal"])
            await redis.xadd(
                stream_name,
                job.to_stream_dict(),
                maxlen=REDIS_STREAM_MAX_LEN,
            )
            logger.debug(f"Published job {job.job_id} to {stream_name}")
            return True
        except (RedisError, OSError, TimeoutError) as e:
            logger.warning(f"Failed to publish job to Redis: {e}")
            return False
JobQueue.claim_job method · python · L238-L278 (41 LOC)
api/job_queue.py
    async def claim_job(self) -> Optional[JobDispatch]:
        """
        Claim a job from the Redis queue.

        Checks priority streams in order (high -> normal -> low).
        Also recovers abandoned messages from crashed workers.

        Returns:
            JobDispatch if a job was claimed, None if no jobs available
        """
        if not self.is_redis_enabled or not self._consumer_name:
            return None

        redis = await get_redis()
        if not redis:
            # Redis temporarily unavailable; RedisClient handles recovery via circuit breaker
            return None

        try:
            # Rate-limit abandoned message checking (Issue #429)
            # Only check every ABANDONED_CHECK_INTERVAL_SECONDS to reduce Redis round-trips
            now = time.monotonic()
            if now - self._last_abandoned_check >= ABANDONED_CHECK_INTERVAL_SECONDS:
                self._last_abandoned_check = now
                recovered = await self._recover_abando
JobQueue._recover_abandoned_messages method · python · L280-L323 (44 LOC)
api/job_queue.py
    async def _recover_abandoned_messages(self, redis) -> Optional[JobDispatch]:
        """Check for and recover abandoned messages from crashed workers."""
        for priority in STREAM_PRIORITIES:
            stream_name = PRIORITY_STREAMS[priority]
            try:
                # Get pending messages that have been idle too long
                pending = await redis.xpending_range(
                    stream_name,
                    REDIS_CONSUMER_GROUP,
                    min="-",
                    max="+",
                    count=10,
                )

                for msg in pending:
                    idle_time = msg.get("time_since_delivered", 0)
                    if idle_time > REDIS_PENDING_TIMEOUT_MS:
                        # Claim this abandoned message
                        claimed = await redis.xclaim(
                            stream_name,
                            REDIS_CONSUMER_GROUP,
                            self._consumer_name,
            
JobQueue._read_from_stream method · python · L325-L354 (30 LOC)
api/job_queue.py
    async def _read_from_stream(self, redis, stream_name: str) -> Optional[JobDispatch]:
        """Read a new message from a specific stream."""
        try:
            messages = await redis.xreadgroup(
                REDIS_CONSUMER_GROUP,
                self._consumer_name,
                {stream_name: ">"},
                count=1,
                block=REDIS_CONSUMER_BLOCK_MS,
            )

            if messages:
                # messages format: [[stream_name, [(message_id, data), ...]]]
                stream, msg_list = messages[0]
                if msg_list:
                    message_id, data = msg_list[0]
                    try:
                        return JobDispatch.from_stream_dict(data, message_id=message_id, stream_name=stream_name)
                    except ValueError as e:
                        # Malformed message - acknowledge it to remove from stream
                        logger.error(f"Malformed job message in {stream_name} (id={message_id}): {e}
JobQueue.acknowledge_job method · python · L356-L379 (24 LOC)
api/job_queue.py
    async def acknowledge_job(self, job: JobDispatch) -> bool:
        """
        Acknowledge job completion to Redis.

        Args:
            job: The completed job

        Returns:
            True if acknowledged, False otherwise
        """
        if not job._message_id or not job._stream_name:
            return False

        redis = await get_redis()
        if not redis:
            return False

        try:
            await redis.xack(job._stream_name, REDIS_CONSUMER_GROUP, job._message_id)
            logger.debug(f"Acknowledged job {job.job_id}")
            return True
        except (RedisError, OSError, TimeoutError) as e:
            logger.warning(f"Failed to acknowledge job {job.job_id}: {e}")
            return False
JobQueue.reject_job method · python · L381-L421 (41 LOC)
api/job_queue.py
    async def reject_job(self, job: JobDispatch, error: str) -> bool:
        """
        Reject a job and move it to dead letter stream.

        Uses a Redis pipeline to ensure atomic operation - either both
        the DLQ entry is added AND the original message is acknowledged,
        or neither operation occurs.

        Args:
            job: The failed job
            error: Error message

        Returns:
            True if moved to DLQ, False otherwise
        """
        if not job._message_id or not job._stream_name:
            return False

        redis = await get_redis()
        if not redis:
            return False

        try:
            # Prepare DLQ data
            dlq_data = job.to_stream_dict()
            dlq_data["error"] = error[:500]
            dlq_data["failed_at"] = datetime.now(timezone.utc).isoformat()
            dlq_data["original_stream"] = job._stream_name

            # Use pipeline for atomic operation
            # This ensures both operation
JobQueue.get_queue_stats method · python · L423-L458 (36 LOC)
api/job_queue.py
    async def get_queue_stats(self) -> dict:
        """
        Get queue statistics.

        Returns:
            Dict with stream lengths and pending counts
        """
        redis = await get_redis()
        if not redis:
            return {"available": False}

        stats = {"available": True, "streams": {}}

        try:
            for priority, stream_name in PRIORITY_STREAMS.items():
                try:
                    length = await redis.xlen(stream_name)
                    pending_info = await redis.xpending(stream_name, REDIS_CONSUMER_GROUP)
                    stats["streams"][priority] = {
                        "length": length,
                        "pending": pending_info.get("pending", 0) if pending_info else 0,
                    }
                except (RedisError, OSError, TimeoutError):
                    stats["streams"][priority] = {"length": 0, "pending": 0}

            # Dead letter queue
            try:
                dlq_length = await 
_get_job_queue_init_lock function · python · L467-L491 (25 LOC)
api/job_queue.py
def _get_job_queue_init_lock() -> asyncio.Lock:
    """Get or create the job queue initialization lock.

    The lock is created lazily to ensure it's bound to the correct event loop.
    This is necessary because the lock may be used across different event loops
    in testing or when the application restarts.
    """
    global _job_queue_init_lock

    if _job_queue_init_lock is None:
        _job_queue_init_lock = asyncio.Lock()
        return _job_queue_init_lock

    # Check if the lock is bound to a different event loop
    try:
        current_loop = asyncio.get_running_loop()
        lock_loop = getattr(_job_queue_init_lock, "_loop", None)
        if lock_loop is not None and lock_loop is not current_loop:
            # Lock is from a different event loop, create a new one
            _job_queue_init_lock = asyncio.Lock()
    except RuntimeError:
        # No event loop running, the existing lock should be fine
        pass

    return _job_queue_init_lock
Repobility — same analyzer, your code, free for public repos · /scan/
get_job_queue function · python · L494-L504 (11 LOC)
api/job_queue.py
async def get_job_queue() -> JobQueue:
    """Get or create the global job queue instance, initialized for API publishing."""
    global _job_queue, _job_queue_initialized
    async with _get_job_queue_init_lock():
        if _job_queue is None:
            _job_queue = JobQueue()
        if not _job_queue_initialized:
            # Initialize for API publishing (no consumer operations needed)
            await _job_queue.initialize(consumer_name="api-publisher")
            _job_queue_initialized = True
    return _job_queue
_ensure_utc_datetime function · python · L93-L109 (17 LOC)
api/job_state.py
def _ensure_utc_datetime(dt: Optional[datetime]) -> Optional[datetime]:
    """
    Normalize datetime to UTC timezone.

    Args:
        dt: A datetime that may be naive or timezone-aware

    Returns:
        UTC-aware datetime, or None if input is None
    """
    if dt is None:
        return None
    if dt.tzinfo is None:
        # Naive datetime - assume UTC and log warning
        logger.warning(f"Naive datetime detected, assuming UTC: {dt}")
        return dt.replace(tzinfo=timezone.utc)
    return dt.astimezone(timezone.utc)
JobRow.from_mapping method · python · L131-L159 (29 LOC)
api/job_state.py
    def from_mapping(cls, row: Mapping[str, Any]) -> "JobRow":
        """
        Create JobRow from a database row mapping.

        Normalizes datetime fields to UTC and validates numeric fields.
        """
        # Normalize datetimes to UTC
        claimed_at = _ensure_utc_datetime(row.get("claimed_at"))
        claim_expires_at = _ensure_utc_datetime(row.get("claim_expires_at"))
        completed_at = _ensure_utc_datetime(row.get("completed_at"))

        # Get numeric fields with defaults, ensure minimum values
        attempt_number = row.get("attempt_number", 1)
        max_attempts = row.get("max_attempts", 3)

        # Ensure valid ranges (minimum 1 for both)
        if attempt_number is None or attempt_number < 1:
            attempt_number = 1
        if max_attempts is None or max_attempts < 1:
            max_attempts = 3

        return cls(
            claimed_at=claimed_at,
            claim_expires_at=claim_expires_at,
            completed_at=completed_at,
      
TranscodingJobStateMachine._validate_sql_identifier method · python · L205-L220 (16 LOC)
api/job_state.py
    def _validate_sql_identifier(self, value: str, param_name: str) -> None:
        """
        Validate that a string is a safe SQL identifier.

        Args:
            value: The identifier to validate
            param_name: Name of the parameter (for error messages)

        Raises:
            ValueError: If the identifier is not safe
        """
        if not _SAFE_SQL_IDENTIFIER.match(value):
            raise ValueError(
                f"Invalid SQL identifier for {param_name}: '{value}'. "
                f"Must match pattern: [a-zA-Z_][a-zA-Z0-9_]*"
            )
TranscodingJobStateMachine._validate_sql_param method · python · L222-L237 (16 LOC)
api/job_state.py
    def _validate_sql_param(self, value: str, param_name: str) -> None:
        """
        Validate that a string is a safe SQL parameter name.

        Args:
            value: The parameter name to validate (e.g., ":now")
            param_name: Name of the parameter (for error messages)

        Raises:
            ValueError: If the parameter name is not safe
        """
        if not _SAFE_SQL_PARAM.match(value):
            raise ValueError(
                f"Invalid SQL parameter for {param_name}: '{value}'. "
                f"Must match pattern: :[a-zA-Z_][a-zA-Z0-9_]*"
            )
TranscodingJobStateMachine.is_unclaimed method · python · L243-L255 (13 LOC)
api/job_state.py
    def is_unclaimed(self, job: Union[JobRow, Mapping[str, Any]]) -> bool:
        """
        Check if job is available for workers to claim.

        A job is unclaimed when it has no active claim, is not completed,
        and is not in a retry/failed state due to errors.
        """
        job = self._normalize_job(job)
        return (
            job.claimed_at is None
            and job.completed_at is None
            and job.last_error is None
        )
TranscodingJobStateMachine.is_claimed method · python · L257-L281 (25 LOC)
api/job_state.py
    def is_claimed(
        self, job: Union[JobRow, Mapping[str, Any]], current_time: Optional[datetime] = None
    ) -> bool:
        """
        Check if a worker has a valid, active claim on this job.

        A job is claimed when:
        - A worker has claimed it (claimed_at IS NOT NULL)
        - The claim hasn't expired (claim_expires_at > current_time)
        - It hasn't been completed (completed_at IS NULL)

        Args:
            job: The job row to check
            current_time: Time to use for expiration check. Defaults to UTC now.
        """
        job = self._normalize_job(job)
        if current_time is None:
            current_time = datetime.now(timezone.utc)

        return (
            job.claimed_at is not None
            and job.completed_at is None
            and job.claim_expires_at is not None
            and job.claim_expires_at > current_time
        )
TranscodingJobStateMachine.is_expired method · python · L283-L307 (25 LOC)
api/job_state.py
    def is_expired(
        self, job: Union[JobRow, Mapping[str, Any]], current_time: Optional[datetime] = None
    ) -> bool:
        """
        Check if the worker's claim has expired and job can be reclaimed.

        A claim is expired when:
        - A worker had claimed it (claimed_at IS NOT NULL)
        - The claim has expired (claim_expires_at <= current_time)
        - It hasn't been completed (completed_at IS NULL)

        Args:
            job: The job row to check
            current_time: Time to use for expiration check. Defaults to UTC now.
        """
        job = self._normalize_job(job)
        if current_time is None:
            current_time = datetime.now(timezone.utc)

        return (
            job.claimed_at is not None
            and job.completed_at is None
            and job.claim_expires_at is not None
            and job.claim_expires_at <= current_time
        )
Repobility analyzer · published findings · https://repobility.com
TranscodingJobStateMachine.is_completed method · python · L309-L316 (8 LOC)
api/job_state.py
    def is_completed(self, job: Union[JobRow, Mapping[str, Any]]) -> bool:
        """
        Check if transcoding finished successfully.

        A job is completed when completed_at IS NOT NULL.
        """
        job = self._normalize_job(job)
        return job.completed_at is not None
TranscodingJobStateMachine.is_failed method · python · L318-L332 (15 LOC)
api/job_state.py
    def is_failed(self, job: Union[JobRow, Mapping[str, Any]]) -> bool:
        """
        Check if job permanently failed after all retry attempts.

        A job is failed when:
        - It's not completed (completed_at IS NULL)
        - There was an error (last_error IS NOT NULL)
        - All attempts exhausted (attempt_number >= max_attempts)
        """
        job = self._normalize_job(job)
        return (
            job.completed_at is None
            and job.last_error is not None
            and job.attempt_number >= job.max_attempts
        )
TranscodingJobStateMachine.is_retrying method · python · L334-L350 (17 LOC)
api/job_state.py
    def is_retrying(self, job: Union[JobRow, Mapping[str, Any]]) -> bool:
        """
        Check if job failed but is available for retry.

        A job is retrying when:
        - It's not completed (completed_at IS NULL)
        - There was an error (last_error IS NOT NULL)
        - Attempts remain (attempt_number < max_attempts)
        - No active claim (claimed_at IS NULL)
        """
        job = self._normalize_job(job)
        return (
            job.completed_at is None
            and job.last_error is not None
            and job.attempt_number < job.max_attempts
            and job.claimed_at is None
        )
‹ prevpage 7 / 20next ›