← back to jshudzina__PitLane-AI

Function bodies 217 total

All specs Real LLM only Function bodies
is_safe_filename function · python · L26-L51 (26 LOC)
packages/pitlane-web/src/pitlane_web/security.py
def is_safe_filename(filename: str) -> bool:
    """Validate filename using whitelist pattern to prevent path traversal.

    Only allows alphanumeric characters, underscores, hyphens, and dots.
    Prevents path traversal attempts including URL encoding and Unicode normalization.

    Args:
        filename: The filename to validate

    Returns:
        True if filename is safe, False otherwise
    """
    if not filename:
        return False

    # Whitelist pattern: only allow safe characters
    # Letters, numbers, underscore, hyphen, and single dots (for extensions)
    if not re.match(r"^[a-zA-Z0-9_.-]+$", filename):
        return False

    # Additional checks to prevent edge cases
    if filename.startswith(".") or filename.endswith("."):
        return False

    # Double dots still not allowed (prevents traversal attacks)
    return ".." not in filename
validate_file_path function · python · L54-L79 (26 LOC)
packages/pitlane-web/src/pitlane_web/security.py
def validate_file_path(file_path: Path, workspace_path: Path) -> bool:
    """Validate that resolved file path is within workspace directory.

    Resolves symlinks and verifies the file is within the workspace boundaries.
    This prevents path traversal and symlink attacks.

    Args:
        file_path: The file path to validate
        workspace_path: The workspace directory path

    Returns:
        True if file is within workspace, False otherwise
    """
    try:
        # Reject broken symlinks - check if it's a symlink and the target doesn't exist
        if file_path.is_symlink() and not file_path.exists():
            return False

        resolved_file = file_path.resolve()
        resolved_workspace = workspace_path.resolve()

        # Check if the resolved file path starts with the workspace path
        return str(resolved_file).startswith(str(resolved_workspace))
    except (OSError, RuntimeError):
        # Handle errors during path resolution (broken symlinks, permis
is_allowed_file_extension function · python · L82-L92 (11 LOC)
packages/pitlane-web/src/pitlane_web/security.py
def is_allowed_file_extension(file_path: Path, allowed_extensions: set[str]) -> bool:
    """Validate that file extension is in the allowed whitelist.

    Args:
        file_path: The file path to check
        allowed_extensions: Set of allowed file extensions (e.g., {".png", ".jpg"})

    Returns:
        True if extension is allowed, False otherwise
    """
    return file_path.suffix.lower() in allowed_extensions
WorkspaceExistenceCache.__init__ method · python · L37-L45 (9 LOC)
packages/pitlane-web/src/pitlane_web/session.py
    def __init__(self, ttl: int = 60):
        """Initialize the cache.

        Args:
            ttl: Time-to-live in seconds for cache entries (default: 60)
        """
        self._cache: dict[str, tuple[bool, float]] = {}
        self._lock = Lock()
        self._ttl = ttl
WorkspaceExistenceCache.get method · python · L47-L63 (17 LOC)
packages/pitlane-web/src/pitlane_web/session.py
    def get(self, session_id: str) -> bool | None:
        """Get cached workspace existence result if still valid.

        Args:
            session_id: Session ID to check

        Returns:
            Cached result (True/False) if valid, None if expired or not cached
        """
        with self._lock:
            if session_id in self._cache:
                result, timestamp = self._cache[session_id]
                if time() - timestamp < self._ttl:
                    return result
                # Expired, remove from cache
                del self._cache[session_id]
        return None
WorkspaceExistenceCache.set method · python · L65-L73 (9 LOC)
packages/pitlane-web/src/pitlane_web/session.py
    def set(self, session_id: str, exists: bool) -> None:
        """Cache workspace existence result.

        Args:
            session_id: Session ID
            exists: Whether workspace exists
        """
        with self._lock:
            self._cache[session_id] = (exists, time())
WorkspaceExistenceCache.invalidate method · python · L75-L82 (8 LOC)
packages/pitlane-web/src/pitlane_web/session.py
    def invalidate(self, session_id: str) -> None:
        """Invalidate cached entry for session.

        Args:
            session_id: Session ID to invalidate
        """
        with self._lock:
            self._cache.pop(session_id, None)
Open data scored by Repobility · https://repobility.com
workspace_exists_cached function · python · L94-L109 (16 LOC)
packages/pitlane-web/src/pitlane_web/session.py
def workspace_exists_cached(session_id: str) -> bool:
    """Check workspace existence with caching.

    Args:
        session_id: Session ID to check

    Returns:
        True if workspace exists, False otherwise
    """
    cached = _workspace_cache.get(session_id)
    if cached is not None:
        return cached

    exists = workspace_exists(session_id)
    _workspace_cache.set(session_id, exists)
    return exists
validate_session_safely function · python · L112-L138 (27 LOC)
packages/pitlane-web/src/pitlane_web/session.py
def validate_session_safely(session: str | None) -> tuple[bool, str | None]:
    """Validate session with constant-time checks to prevent timing attacks.

    Performs validation checks in a consistent order regardless of where validation
    fails, making it harder for attackers to probe for valid session IDs.

    Args:
        session: Session ID from cookie (may be None)

    Returns:
        Tuple of (is_valid, session_id)
        - is_valid: True if session is valid and exists
        - session_id: The validated session ID if valid, None otherwise
    """
    # Always check format first (constant time for UUID validation)
    is_valid_format = is_valid_session_id(session) if session else False

    # Always check workspace existence (even if format invalid, to maintain constant timing)
    # This prevents attackers from using timing to determine if a UUID exists
    # Use cached version for better performance
    exists = workspace_exists_cached(session) if is_valid_format else F
update_workspace_metadata_safe function · python · L141-L154 (14 LOC)
packages/pitlane-web/src/pitlane_web/session.py
def update_workspace_metadata_safe(session_id: str) -> None:
    """Safely update workspace metadata with proper error logging.

    Args:
        session_id: Session ID to update
    """
    try:
        update_workspace_metadata(session_id)
    except FileNotFoundError as e:
        logger.warning(f"Workspace metadata file not found for session {session_id}: {e}")
    except PermissionError as e:
        logger.error(f"Permission denied updating workspace metadata for session {session_id}: {e}")
    except Exception as e:
        logger.error(f"Unexpected error updating workspace metadata for session {session_id}: {e}", exc_info=True)
create_session_cookie_params function · python · L157-L168 (12 LOC)
packages/pitlane-web/src/pitlane_web/session.py
def create_session_cookie_params() -> dict:
    """Generate session cookie parameters from configuration.

    Returns:
        Dictionary of cookie parameters for set_cookie()
    """
    return {
        "max_age": SESSION_MAX_AGE,
        "secure": SESSION_COOKIE_SECURE,
        "httponly": SESSION_COOKIE_HTTPONLY,
        "samesite": SESSION_COOKIE_SAMESITE,
    }
set_session_cookie function · python · L171-L183 (13 LOC)
packages/pitlane-web/src/pitlane_web/session.py
def set_session_cookie(response: Response, session_id: str) -> None:
    """Set session cookie on response with secure configuration.

    Args:
        response: FastAPI Response object
        session_id: Session ID to set in cookie
    """
    cookie_params = create_session_cookie_params()
    response.set_cookie(
        key=SESSION_COOKIE_NAME,
        value=session_id,
        **cookie_params,
    )
validate_version function · python · L29-L65 (37 LOC)
scripts/bump_version.py
def validate_version(version: str) -> str:
    """
    Validate and normalize PEP 440 version format.

    Supports Python's PEP 440 versioning including pre-release and local versions:
    - 0.2.0
    - 0.2.0a0 or 0.2.0alpha0 (alpha)
    - 0.2.0b1 or 0.2.0beta1 (beta)
    - 0.2.0rc1 (release candidate)
    - 0.2.0.post1 (post-release)
    - 0.2.0.dev0 (development)
    - 0.2.0+test (local version)

    Args:
        version: Version string (e.g., "0.2.0", "v0.2.0a0", "0.2.0rc1")

    Returns:
        Normalized version string without 'v' prefix

    Raises:
        ValueError: If version format is invalid per PEP 440
    """
    # Strip 'v' prefix if present
    normalized = version.lstrip("v")

    # Validate PEP 440 version format
    # Based on https://peps.python.org/pep-0440/
    # [N!]N(.N)*[{a|alpha|b|beta|rc}N][.postN][.devN][+local]
    pattern = r"^(\d+!)?\d+(\.\d+)*((a|alpha|b|beta|rc)\d+)?(\.post\d+)?(\.dev\d+)?(\+[a-zA-Z0-9]+(\.[a-zA-Z0-9]+)*)?$"
    if not re.match(patte
update_pyproject_toml function · python · L68-L95 (28 LOC)
scripts/bump_version.py
def update_pyproject_toml(file_path: Path, version: str) -> None:
    """
    Update version in pyproject.toml file.

    Args:
        file_path: Path to pyproject.toml file
        version: New version string

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If version line not found
    """
    if not file_path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    content = file_path.read_text()

    # Match 'version = "X.Y.Z"' pattern
    pattern = r'^version = "[^"]*"'
    replacement = f'version = "{version}"'

    new_content, count = re.subn(pattern, replacement, content, count=1, flags=re.MULTILINE)

    if count == 0:
        raise ValueError(f"Version line not found in {file_path}")

    file_path.write_text(new_content)
    print(f"✓ Updated {file_path.relative_to(Path.cwd())} to version {version}")
update_init_py function · python · L98-L125 (28 LOC)
scripts/bump_version.py
def update_init_py(file_path: Path, version: str) -> None:
    """
    Update __version__ in __init__.py file.

    Args:
        file_path: Path to __init__.py file
        version: New version string

    Raises:
        FileNotFoundError: If file doesn't exist
        ValueError: If __version__ line not found
    """
    if not file_path.exists():
        raise FileNotFoundError(f"File not found: {file_path}")

    content = file_path.read_text()

    # Match '__version__ = "X.Y.Z"' pattern
    pattern = r'^__version__ = "[^"]*"'
    replacement = f'__version__ = "{version}"'

    new_content, count = re.subn(pattern, replacement, content, count=1, flags=re.MULTILINE)

    if count == 0:
        raise ValueError(f"__version__ line not found in {file_path}")

    file_path.write_text(new_content)
    print(f"✓ Updated {file_path.relative_to(Path.cwd())} to version {version}")
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
bump_version function · python · L128-L170 (43 LOC)
scripts/bump_version.py
def bump_version(version: str) -> None:
    """
    Update version in all PitLane-AI package files.

    Args:
        version: New version string

    Raises:
        Exception: If any file update fails (all updates are rolled back)
    """
    # Get repository root
    repo_root = Path(__file__).parent.parent

    # Define files to update with their update functions
    files_to_update: list[tuple[Path, callable]] = [
        (repo_root / "pyproject.toml", update_pyproject_toml),
        (repo_root / "packages/pitlane-agent/pyproject.toml", update_pyproject_toml),
        (repo_root / "packages/pitlane-web/pyproject.toml", update_pyproject_toml),
        (repo_root / "packages/pitlane-agent/src/pitlane_agent/__init__.py", update_init_py),
        (repo_root / "packages/pitlane-web/src/pitlane_web/__init__.py", update_init_py),
    ]

    # Validate all files exist before making any changes
    print("Validating files...")
    for file_path, _ in files_to_update:
        if not file_p
main function · python · L173-L201 (29 LOC)
scripts/bump_version.py
def main() -> int:
    """
    Main entry point for the script.

    Returns:
        Exit code (0 for success, 1 for failure)
    """
    if len(sys.argv) != 2:
        print("Usage: python scripts/bump_version.py <version>", file=sys.stderr)
        print("Example: python scripts/bump_version.py 0.2.0", file=sys.stderr)
        return 1

    try:
        version_arg = sys.argv[1]
        version = validate_version(version_arg)
        bump_version(version)
        return 0

    except ValueError as e:
        print(f"✗ Validation error: {e}", file=sys.stderr)
        return 1

    except FileNotFoundError as e:
        print(f"✗ File error: {e}", file=sys.stderr)
        return 1

    except Exception as e:
        print(f"✗ Unexpected error: {e}", file=sys.stderr)
        return 1
‹ prevpage 5 / 5