← back to jshudzina__PitLane-AI

Function bodies 217 total

All specs Real LLM only Function bodies
_generate_title function · python · L449-L465 (17 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def _generate_title(message: str, max_length: int = 50) -> str:
    """Generate a title from the first message.

    Args:
        message: The user's first message.
        max_length: Maximum title length.

    Returns:
        A truncated title suitable for display.
    """
    # Remove extra whitespace
    clean = " ".join(message.split())
    if len(clean) <= max_length:
        return clean
    # Truncate at word boundary
    truncated = clean[:max_length].rsplit(" ", 1)[0]
    return truncated + "..."
create_conversation function · python · L468-L504 (37 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def create_conversation(
    workspace_id: str,
    agent_session_id: str,
    first_message: str,
) -> dict:
    """Create a new conversation entry.

    Args:
        workspace_id: Workspace identifier.
        agent_session_id: Claude SDK session ID for resumption.
        first_message: First user message (used for title/preview).

    Returns:
        The created conversation dict.

    Raises:
        ValueError: If workspace doesn't exist.
    """
    conv_id = f"conv_{uuid.uuid4().hex[:12]}"
    now = datetime.now(UTC).isoformat() + "Z"

    conversation = {
        "id": conv_id,
        "agent_session_id": agent_session_id,
        "title": _generate_title(first_message),
        "created_at": now,
        "last_message_at": now,
        "message_count": 1,
        "preview": first_message[:100] + ("..." if len(first_message) > 100 else ""),
    }

    data = load_conversations(workspace_id)
    data["conversations"].insert(0, conversation)  # Most recent first
    data["acti
update_conversation function · python · L507-L533 (27 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def update_conversation(
    workspace_id: str,
    conversation_id: str,
    message_count_delta: int = 1,
) -> None:
    """Update conversation metadata after a message.

    Args:
        workspace_id: Workspace identifier.
        conversation_id: The conversation to update.
        message_count_delta: Number of messages to add to count.
    """
    data = load_conversations(workspace_id)

    found = False
    for conv in data["conversations"]:
        if conv["id"] == conversation_id:
            conv["last_message_at"] = datetime.now(UTC).isoformat() + "Z"
            conv["message_count"] += message_count_delta
            found = True
            break

    if not found:
        logger.warning(f"Conversation {conversation_id} not found in workspace {workspace_id}")
        return

    save_conversations(workspace_id, data)
get_active_conversation function · python · L536-L554 (19 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def get_active_conversation(workspace_id: str) -> dict | None:
    """Get the currently active conversation for a workspace.

    Args:
        workspace_id: Workspace identifier.

    Returns:
        The active conversation dict, or None if no active conversation.
    """
    data = load_conversations(workspace_id)
    active_id = data.get("active_conversation_id")

    if not active_id:
        return None

    for conv in data["conversations"]:
        if conv["id"] == active_id:
            return conv
    return None
set_active_conversation function · python · L557-L566 (10 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def set_active_conversation(workspace_id: str, conversation_id: str | None) -> None:
    """Set the active conversation for a workspace.

    Args:
        workspace_id: Workspace identifier.
        conversation_id: The conversation ID to set as active, or None to clear.
    """
    data = load_conversations(workspace_id)
    data["active_conversation_id"] = conversation_id
    save_conversations(workspace_id, data)
get_messages_path function · python · L574-L584 (11 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def get_messages_path(workspace_id: str, conversation_id: str) -> Path:
    """Get path to the messages file for a specific conversation.

    Args:
        workspace_id: The workspace identifier.
        conversation_id: The conversation identifier.

    Returns:
        Path to the {conversation_id}.messages.json file.
    """
    return get_workspace_path(workspace_id) / f"{conversation_id}.messages.json"
save_message function · python · L587-L634 (48 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def save_message(workspace_id: str, conversation_id: str, question: str, content: str) -> None:
    """Atomically append a message pair to the conversation's messages file.

    Args:
        workspace_id: Workspace identifier.
        conversation_id: The conversation to append to.
        question: The user's question.
        content: The assistant's response content.

    Raises:
        ValueError: If workspace doesn't exist.
    """
    if not workspace_exists(workspace_id):
        raise ValueError(f"Workspace does not exist for workspace ID: {workspace_id}")

    messages_path = get_messages_path(workspace_id, conversation_id)
    workspace_path = get_workspace_path(workspace_id)

    # Load existing messages
    messages = []
    if messages_path.exists():
        try:
            with open(messages_path) as f:
                messages = json.load(f)
        except (json.JSONDecodeError, OSError):
            messages = []

    messages.append(
        {
            "question"
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
load_messages function · python · L637-L655 (19 LOC)
packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def load_messages(workspace_id: str, conversation_id: str) -> list[dict]:
    """Load all message pairs for a conversation.

    Args:
        workspace_id: Workspace identifier.
        conversation_id: The conversation to load messages for.

    Returns:
        List of message dicts with 'question', 'content', and 'timestamp' keys.
        Returns empty list if no messages have been saved yet.
    """
    messages_path = get_messages_path(workspace_id, conversation_id)
    if not messages_path.exists():
        return []
    try:
        with open(messages_path) as f:
            return json.load(f)
    except (json.JSONDecodeError, OSError):
        return []
TemporalAnalyzer.analyze method · python · L25-L82 (58 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def analyze(self, current_time: datetime) -> TemporalContext:
        """Analyze schedule and determine current temporal state.

        Args:
            current_time: Current time (UTC)

        Returns:
            Complete temporal context
        """
        # Ensure timezone awareness
        if current_time.tzinfo is None:
            current_time = current_time.replace(tzinfo=UTC)

        # Determine season year
        current_season = self._determine_season_year(current_time)

        # Fetch schedule
        try:
            schedule = fastf1.get_event_schedule(current_season, include_testing=False)
        except (ValueError, KeyError, AttributeError, ImportError):
            # If current season schedule isn't available, try previous year
            current_season = current_time.year - 1
            schedule = fastf1.get_event_schedule(current_season, include_testing=False)

        # Convert schedule to list of events
        events = self._parse_schedule(schedule)
TemporalAnalyzer._determine_season_year method · python · L84-L107 (24 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _determine_season_year(self, current_time: datetime) -> int:
        """Determine which F1 season year to query.

        Args:
            current_time: Current time (UTC)

        Returns:
            Season year
        """
        year = current_time.year

        # If it's January or February, we might be in off-season
        # Use previous year if current year schedule isn't available yet
        if current_time.month <= 2:
            try:
                # Try current year first
                fastf1.get_event_schedule(year, include_testing=False)
                return year
            except (ValueError, KeyError, AttributeError, ImportError):
                # FastF1 schedule not available for current year yet
                # Fall back to previous year
                return year - 1

        return year
TemporalAnalyzer._parse_schedule method · python · L109-L148 (40 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _parse_schedule(self, schedule: pd.DataFrame) -> list[dict]:
        """Parse FastF1 schedule DataFrame into list of event dicts.

        Args:
            schedule: FastF1 event schedule DataFrame

        Returns:
            List of parsed event dictionaries
        """
        events = []
        for _, event in schedule.iterrows():
            # Parse sessions
            sessions = []
            for i in range(1, 6):
                session_name = event.get(f"Session{i}")
                session_date = event.get(f"Session{i}Date")
                session_date_utc = event.get(f"Session{i}DateUtc")

                if session_name and pd.notna(session_date):
                    sessions.append(
                        {
                            "name": session_name,
                            "date_local": session_date,
                            "date_utc": session_date_utc,
                        }
                    )

            events.append(
                
TemporalAnalyzer._determine_season_phase method · python · L150-L192 (43 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _determine_season_phase(self, current_time: datetime, events: list[dict]) -> F1Season:
        """Determine current F1 season phase.

        Args:
            current_time: Current time (UTC)
            events: List of race events

        Returns:
            Season phase
        """
        if not events:
            return F1Season.OFF_SEASON

        # Get first and last race dates
        first_race_date = events[0]["event_date"]
        last_race_date = events[-1]["event_date"]

        # Convert to datetime if needed
        if isinstance(first_race_date, pd.Timestamp):
            first_race_date = first_race_date.to_pydatetime()
        if isinstance(last_race_date, pd.Timestamp):
            last_race_date = last_race_date.to_pydatetime()

        # Ensure timezone awareness
        if first_race_date.tzinfo is None:
            first_race_date = first_race_date.replace(tzinfo=UTC)
        if last_race_date.tzinfo is None:
            last_race_date = last_race_date
TemporalAnalyzer._find_current_weekend method · python · L194-L251 (58 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _find_current_weekend(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
        """Find active race weekend if any.

        A weekend is considered "current" from Thursday 48h before first session
        until 6 hours after the race.

        Args:
            current_time: Current time (UTC)
            events: List of race events

        Returns:
            Current race weekend context or None
        """
        for event in events:
            sessions = event["sessions"]
            if not sessions:
                continue

            # Parse session dates
            parsed_sessions = []
            for session in sessions:
                date_utc = session["date_utc"]
                date_local = session["date_local"]

                if isinstance(date_utc, pd.Timestamp):
                    date_utc = date_utc.to_pydatetime()
                if isinstance(date_local, pd.Timestamp):
                    date_local = date_local.to_pyda
TemporalAnalyzer._find_last_completed_race method · python · L253-L275 (23 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _find_last_completed_race(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
        """Find most recently completed race.

        Args:
            current_time: Current time (UTC)
            events: List of race events

        Returns:
            Last completed race context or None
        """
        completed_events = []

        for event in events:
            if self._is_race_completed(event, current_time):
                completed_events.append(event)

        if not completed_events:
            return None

        # Get the most recent one
        last_event = completed_events[-1]
        sessions = self._parse_event_sessions(last_event)
        return self._build_race_weekend_context(last_event, sessions, current_time, completed=True)
TemporalAnalyzer._find_next_race method · python · L277-L298 (22 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _find_next_race(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
        """Find next upcoming race weekend.

        Args:
            current_time: Current time (UTC)
            events: List of race events

        Returns:
            Next race context or None
        """
        for event in events:
            event_date = event["event_date"]
            if isinstance(event_date, pd.Timestamp):
                event_date = event_date.to_pydatetime()
            if event_date.tzinfo is None:
                event_date = event_date.replace(tzinfo=UTC)

            if event_date > current_time:
                sessions = self._parse_event_sessions(event)
                return self._build_race_weekend_context(event, sessions, current_time)

        return None
Repobility · MCP-ready · https://repobility.com
TemporalAnalyzer._is_race_completed method · python · L300-L328 (29 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _is_race_completed(self, event: dict, current_time: datetime) -> bool:
        """Check if a race is completed.

        Args:
            event: Event dictionary
            current_time: Current time (UTC)

        Returns:
            True if race is completed
        """
        # Find the race session (last session)
        sessions = event.get("sessions", [])
        if not sessions:
            return False

        last_session = sessions[-1]
        race_date = last_session["date_utc"]

        if isinstance(race_date, pd.Timestamp):
            race_date = race_date.to_pydatetime()
        if race_date.tzinfo is None:
            race_date = race_date.replace(tzinfo=UTC)

        # Race is completed if it's more than 4 hours in the past
        # Extended from 3 to 4 hours to handle edge cases:
        # - Red-flagged races (e.g., Belgium 2021, Monaco 2011)
        # - Very long races with multiple safety car periods
        # - Post-race ceremonies and interviews
   
TemporalAnalyzer._parse_event_sessions method · python · L330-L366 (37 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _parse_event_sessions(self, event: dict) -> list[dict]:
        """Parse sessions for an event with timezone handling.

        Args:
            event: Event dictionary

        Returns:
            List of parsed session dictionaries
        """
        parsed_sessions = []
        for session in event.get("sessions", []):
            date_utc = session["date_utc"]
            date_local = session["date_local"]

            if isinstance(date_utc, pd.Timestamp):
                date_utc = date_utc.to_pydatetime()
            if isinstance(date_local, pd.Timestamp):
                date_local = date_local.to_pydatetime()

            # Ensure UTC times have timezone info
            if date_utc.tzinfo is None:
                date_utc = date_utc.replace(tzinfo=UTC)

            # Keep local times as naive datetimes (no timezone)
            # They represent the local time at the circuit
            if date_local.tzinfo is not None:
                date_local = date_local.repla
TemporalAnalyzer._build_race_weekend_context method · python · L368-L465 (98 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _build_race_weekend_context(
        self, event: dict, sessions: list[dict], current_time: datetime, completed: bool = False
    ) -> RaceWeekendContext:
        """Build race weekend context from event data.

        Args:
            event: Event dictionary
            sessions: Parsed sessions
            current_time: Current time (UTC)
            completed: Whether the race is completed

        Returns:
            Race weekend context
        """
        # Determine session type mapping
        session_type_map = {
            "Practice 1": "FP1",
            "Practice 2": "FP2",
            "Practice 3": "FP3",
            "Qualifying": "Q",
            "Sprint Qualifying": "SQ",
            "Sprint": "S",
            "Race": "R",
        }

        # Build session contexts
        session_contexts = []
        current_session = None
        next_session = None

        for session in sessions:
            session_type = session_type_map.get(session["name"], "UNKNOWN"
TemporalAnalyzer._determine_weekend_phase method · python · L467-L506 (40 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _determine_weekend_phase(
        self, sessions: list[SessionContext], current_time: datetime, completed: bool
    ) -> RaceWeekendPhase:
        """Determine current phase of the race weekend.

        Args:
            sessions: List of session contexts
            current_time: Current time (UTC)
            completed: Whether the race is completed

        Returns:
            Race weekend phase
        """
        if completed:
            return RaceWeekendPhase.POST_RACE

        # Check for live or recent sessions
        for session in sessions:
            if session.is_live:
                if "Sprint" in session.name:
                    return RaceWeekendPhase.SPRINT
                elif "Qualifying" in session.name:
                    return RaceWeekendPhase.QUALIFYING
                elif "Race" in session.name:
                    return RaceWeekendPhase.RACE
                elif "Practice" in session.name:
                    return RaceWeekendPhase.PRACTICE
TemporalAnalyzer._compute_ttl method · python · L508-L548 (41 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
    def _compute_ttl(self, season_phase: F1Season, current_weekend: RaceWeekendContext | None) -> int:
        """Compute cache TTL based on temporal state.

        Args:
            season_phase: Current season phase
            current_weekend: Current race weekend (if any)

        Returns:
            TTL in seconds
        """
        # Off-season: 7 days
        if season_phase == F1Season.OFF_SEASON:
            return 7 * 24 * 60 * 60

        # Pre-season: 3 days
        if season_phase == F1Season.PRE_SEASON:
            return 3 * 24 * 60 * 60

        # Post-season: 5 days
        if season_phase == F1Season.POST_SEASON:
            return 5 * 24 * 60 * 60

        # During race weekend
        if current_weekend:
            # Check if any session is live
            if current_weekend.current_session and current_weekend.current_session.is_live:
                return 5 * 60  # 5 minutes during live session

            # Check if next session is soon (within 2 hours)
   
TemporalCache.__init__ method · python · L19-L30 (12 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def __init__(self, cache_dir: Path | None = None):
        """Initialize cache.

        Args:
            cache_dir: Cache directory (defaults to ~/.pitlane/cache/temporal)
        """
        if cache_dir is None:
            cache_dir = Path.home() / ".pitlane" / "cache" / "temporal"

        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(parents=True, exist_ok=True)
        self.cache_file = self.cache_dir / "context_cache.json"
TemporalCache.get method · python · L32-L65 (34 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def get(self, current_time: datetime) -> TemporalContext | None:
        """Retrieve cached context if valid.

        Args:
            current_time: Current time (UTC, must be timezone-aware)

        Returns:
            Cached context if valid, None otherwise

        Raises:
            ValueError: If current_time is not timezone-aware
        """
        if current_time.tzinfo is None:
            raise ValueError("current_time must be timezone-aware (include tzinfo)")

        if not self.cache_file.exists():
            return None

        try:
            with open(self.cache_file) as f:
                data = json.load(f)

            # Check if cache is expired
            expires_at = datetime.fromisoformat(data["expires_at"])
            if current_time > expires_at:
                return None

            # Deserialize context
            context = self._deserialize_context(data["context"])
            return context

        except (json.JSONDecodeError, KeyError, 
TemporalCache.set method · python · L67-L88 (22 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def set(self, context: TemporalContext) -> None:
        """Cache context with TTL.

        Args:
            context: Temporal context to cache
        """
        # Calculate expiration time
        from datetime import timedelta

        expires_at = context.cache_timestamp + timedelta(seconds=context.ttl_seconds)

        # Serialize to JSON
        cache_data = {
            "timestamp": context.cache_timestamp.isoformat(),
            "ttl_seconds": context.ttl_seconds,
            "expires_at": expires_at.isoformat(),
            "context": context.to_dict(),
        }

        # Write to file
        with open(self.cache_file, "w") as f:
            json.dump(cache_data, f, indent=2)
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
TemporalCache._deserialize_context method · python · L95-L118 (24 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def _deserialize_context(self, data: dict) -> TemporalContext:
        """Deserialize context from dictionary.

        Args:
            data: Serialized context dictionary

        Returns:
            Temporal context
        """
        return TemporalContext(
            current_time_utc=datetime.fromisoformat(data["current_time_utc"]),
            current_season=data["current_season"],
            season_phase=F1Season(data["season_phase"]),
            current_weekend=self._deserialize_weekend(data["current_weekend"]) if data["current_weekend"] else None,
            last_completed_race=(
                self._deserialize_weekend(data["last_completed_race"]) if data["last_completed_race"] else None
            ),
            next_race=self._deserialize_weekend(data["next_race"]) if data["next_race"] else None,
            races_completed=data["races_completed"],
            races_remaining=data["races_remaining"],
            days_until_next_race=data["days_until_next_race"]
TemporalCache._deserialize_weekend method · python · L120-L140 (21 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def _deserialize_weekend(self, data: dict) -> RaceWeekendContext:
        """Deserialize race weekend context.

        Args:
            data: Serialized weekend dictionary

        Returns:
            Race weekend context
        """
        return RaceWeekendContext(
            round_number=data["round_number"],
            event_name=data["event_name"],
            country=data["country"],
            location=data["location"],
            event_date=datetime.fromisoformat(data["event_date"]),
            phase=RaceWeekendPhase(data["phase"]),
            current_session=self._deserialize_session(data["current_session"]) if data["current_session"] else None,
            next_session=self._deserialize_session(data["next_session"]) if data["next_session"] else None,
            all_sessions=[self._deserialize_session(s) for s in data["all_sessions"]],
            is_sprint_weekend=data["is_sprint_weekend"],
        )
TemporalCache._deserialize_session method · python · L142-L160 (19 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
    def _deserialize_session(self, data: dict) -> SessionContext:
        """Deserialize session context.

        Args:
            data: Serialized session dictionary

        Returns:
            Session context
        """
        return SessionContext(
            name=data["name"],
            session_type=data["session_type"],
            date_utc=datetime.fromisoformat(data["date_utc"]),
            date_local=datetime.fromisoformat(data["date_local"]),
            is_live=data["is_live"],
            is_recent=data["is_recent"],
            minutes_until=data["minutes_until"],
            minutes_since=data["minutes_since"],
        )
SessionContext.to_dict method · python · L42-L53 (12 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
    def to_dict(self) -> dict:
        """Convert to dictionary with ISO format dates."""
        return {
            "name": self.name,
            "session_type": self.session_type,
            "date_utc": self.date_utc.isoformat(),
            "date_local": self.date_local.isoformat(),
            "is_live": self.is_live,
            "is_recent": self.is_recent,
            "minutes_until": self.minutes_until,
            "minutes_since": self.minutes_since,
        }
RaceWeekendContext.to_dict method · python · L71-L84 (14 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
    def to_dict(self) -> dict:
        """Convert to dictionary."""
        return {
            "round_number": self.round_number,
            "event_name": self.event_name,
            "country": self.country,
            "location": self.location,
            "event_date": self.event_date.isoformat(),
            "phase": self.phase.value,
            "current_session": self.current_session.to_dict() if self.current_session else None,
            "next_session": self.next_session.to_dict() if self.next_session else None,
            "all_sessions": [s.to_dict() for s in self.all_sessions],
            "is_sprint_weekend": self.is_sprint_weekend,
        }
TemporalContext.to_dict method · python · L113-L127 (15 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
    def to_dict(self) -> dict:
        """Convert to dictionary."""
        return {
            "current_time_utc": self.current_time_utc.isoformat(),
            "current_season": self.current_season,
            "season_phase": self.season_phase.value,
            "current_weekend": self.current_weekend.to_dict() if self.current_weekend else None,
            "last_completed_race": self.last_completed_race.to_dict() if self.last_completed_race else None,
            "next_race": self.next_race.to_dict() if self.next_race else None,
            "races_completed": self.races_completed,
            "races_remaining": self.races_remaining,
            "days_until_next_race": self.days_until_next_race,
            "cache_timestamp": self.cache_timestamp.isoformat(),
            "ttl_seconds": self.ttl_seconds,
        }
TemporalContextManager.__init__ method · python · L133-L143 (11 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
    def __init__(self, cache_dir: Path | None = None):
        """Initialize temporal context manager.

        Args:
            cache_dir: Override cache directory (defaults to ~/.pitlane/cache/temporal)
        """
        from pitlane_agent.temporal.analyzer import TemporalAnalyzer
        from pitlane_agent.temporal.cache import TemporalCache

        self.cache = TemporalCache(cache_dir)
        self.analyzer = TemporalAnalyzer()
TemporalContextManager.get_context method · python · L145-L169 (25 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
    def get_context(self, force_refresh: bool = False, current_time: datetime | None = None) -> TemporalContext:
        """Get current temporal context.

        Args:
            force_refresh: Force fetch from FastF1 (ignore cache)
            current_time: Override current time (for testing)

        Returns:
            Complete temporal context
        """
        now = current_time or datetime.now(UTC)

        # Check cache
        if not force_refresh:
            cached = self.cache.get(now)
            if cached:
                return cached

        # Fetch and analyze
        context = self.analyzer.analyze(now)

        # Cache with intelligent TTL
        self.cache.set(context)

        return context
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_temporal_context function · python · L176-L197 (22 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def get_temporal_context(force_refresh: bool = False, current_time: datetime | None = None) -> TemporalContext:
    """Get current F1 temporal context.

    This is the primary public API for accessing temporal context.

    Args:
        force_refresh: Force fetch from FastF1 (ignore cache)
        current_time: Override current time (for testing)

    Returns:
        Complete temporal context

    Example:
        >>> context = get_temporal_context()
        >>> print(f"Current season: {context.current_season}")
        >>> if context.current_weekend:
        >>>     print(f"Race weekend: {context.current_weekend.event_name}")
    """
    global _manager
    if _manager is None:
        _manager = TemporalContextManager()
    return _manager.get_context(force_refresh=force_refresh, current_time=current_time)
format_for_system_prompt function · python · L10-L25 (16 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def format_for_system_prompt(context: TemporalContext, verbosity: str = "normal") -> str:
    """Format temporal context for inclusion in system prompt.

    Args:
        context: Temporal context
        verbosity: "minimal", "normal", or "detailed"

    Returns:
        Formatted string for system prompt
    """
    if verbosity == "minimal":
        return _format_minimal(context)
    elif verbosity == "detailed":
        return _format_detailed(context)
    else:
        return _format_normal(context)
_format_minimal function · python · L28-L57 (30 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_minimal(context: TemporalContext) -> str:
    """Minimal format for system prompt.

    Args:
        context: Temporal context

    Returns:
        Formatted string
    """
    lines = [
        "## F1 Temporal Context",
        "",
        f"**Season:** {context.current_season} ({context.season_phase.value.replace('_', ' ').title()})",
    ]

    if context.next_race:
        lines.append(
            f"**Next Race:** {context.next_race.event_name} (Round {context.next_race.round_number}) - "
            f"{context.next_race.event_date.strftime('%B %d, %Y')}"
        )
        if context.days_until_next_race is not None:
            lines.append(f"  ({context.days_until_next_race} days)")

    if context.last_completed_race:
        lines.append(
            f"**Last Race:** {context.last_completed_race.event_name} - "
            f"Completed {context.last_completed_race.event_date.strftime('%B %d, %Y')}"
        )

    return "\n".join(lines)
_format_normal function · python · L60-L137 (78 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_normal(context: TemporalContext) -> str:
    """Normal format for system prompt.

    Args:
        context: Temporal context

    Returns:
        Formatted string
    """
    lines = ["## F1 Temporal Context", ""]

    # Current season and phase
    lines.append(f"**Current Season:** {context.current_season}")

    phase_display = context.season_phase.value.replace("_", " ").title()
    if context.season_phase == F1Season.IN_SEASON:
        lines.append(
            f"**Phase:** {phase_display} "
            f"(Round {context.races_completed} of {context.races_completed + context.races_remaining} completed)"
        )
    else:
        lines.append(f"**Phase:** {phase_display}")

    lines.append("")

    # Current race weekend (if any)
    if context.current_weekend:
        lines.extend(_format_current_weekend(context.current_weekend))
        lines.append("")

    # Next race
    if context.next_race and not context.current_weekend:
        lines.append(f"**Next Race:*
_format_detailed function · python · L140-L205 (66 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_detailed(context: TemporalContext) -> str:
    """Detailed format for system prompt.

    Args:
        context: Temporal context

    Returns:
        Formatted string
    """
    lines = ["## F1 Temporal Context", ""]

    # Current season and phase
    lines.append(f"**Current Season:** {context.current_season}")

    phase_display = context.season_phase.value.replace("_", " ").title()
    if context.season_phase == F1Season.IN_SEASON:
        lines.append(
            f"**Phase:** {phase_display} "
            f"(Round {context.races_completed} of {context.races_completed + context.races_remaining})"
        )
    else:
        lines.append(f"**Phase:** {phase_display}")

    lines.append("")

    # Current race weekend (if any)
    if context.current_weekend:
        lines.extend(_format_current_weekend_detailed(context.current_weekend, context.current_time_utc))
        lines.append("")

    # Next race (if not current weekend)
    if context.next_race and not context
_format_current_weekend function · python · L208-L260 (53 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_current_weekend(weekend: "RaceWeekendContext") -> list[str]:
    """Format current race weekend for normal verbosity.

    Args:
        weekend: Race weekend context

    Returns:
        List of formatted lines
    """
    lines = [f"**ACTIVE RACE WEEKEND: {weekend.event_name}**"]
    lines.append(f"- Round {weekend.round_number} in {weekend.location}, {weekend.country}")

    if weekend.is_sprint_weekend:
        lines.append("- Event Format: Sprint weekend")

    # Current session
    if weekend.current_session:
        lines.append("")
        if weekend.current_session.is_live:
            lines.append(f"**Current Session:** {weekend.current_session.name} ⚡ LIVE")
            if weekend.current_session.minutes_since:
                lines.append(f"- Started {weekend.current_session.minutes_since} minutes ago")
        else:
            lines.append(f"**Recent Session:** {weekend.current_session.name}")
            if weekend.current_session.minutes_since:
            
_format_current_weekend_detailed function · python · L263-L325 (63 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_current_weekend_detailed(weekend: "RaceWeekendContext", current_time) -> list[str]:
    """Format current race weekend for detailed verbosity.

    Args:
        weekend: Race weekend context
        current_time: Current time (UTC)

    Returns:
        List of formatted lines
    """
    lines = [f"**ACTIVE RACE WEEKEND: {weekend.event_name}**"]
    lines.append(f"- Round {weekend.round_number} in {weekend.location}, {weekend.country}")
    lines.append(f"- Event Date: {weekend.event_date.strftime('%B %d, %Y')}")

    if weekend.is_sprint_weekend:
        lines.append("- Event Format: Sprint weekend")
    else:
        lines.append("- Event Format: Conventional weekend")

    # Current session
    if weekend.current_session:
        lines.append("")
        if weekend.current_session.is_live:
            lines.append(f"**⚡ LIVE SESSION: {weekend.current_session.name}**")
            if weekend.current_session.minutes_since:
                lines.append(f"- Started {weeken
format_as_text function · python · L328-L412 (85 LOC)
packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def format_as_text(context: TemporalContext) -> str:
    """Format temporal context as human-readable text (for CLI display).

    Args:
        context: Temporal context

    Returns:
        Formatted text
    """
    lines = ["=" * 60]
    lines.append("F1 TEMPORAL CONTEXT")
    lines.append("=" * 60)
    lines.append("")

    # Basic info
    lines.append(f"Current Time (UTC): {context.current_time_utc.strftime('%Y-%m-%d %H:%M:%S')}")
    lines.append(f"Season: {context.current_season}")
    lines.append(f"Phase: {context.season_phase.value.replace('_', ' ').title()}")
    lines.append(f"Races Completed: {context.races_completed}")
    lines.append(f"Races Remaining: {context.races_remaining}")
    lines.append("")

    # Current weekend
    if context.current_weekend:
        lines.append("-" * 60)
        lines.append("CURRENT RACE WEEKEND")
        lines.append("-" * 60)
        lines.append(f"Event: {context.current_weekend.event_name}")
        lines.append(f"Round: {context.c
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_is_allowed_bash_command function · python · L37-L75 (39 LOC)
packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
def _is_allowed_bash_command(command: str) -> bool:
    """Check if Bash command is allowed (pitlane CLI only).

    Validates that:
    1. Only whitelisted environment variables are used
    2. The command starts with "pitlane "

    Args:
        command: The bash command to validate.

    Returns:
        True if command is allowed, False otherwise.
    """
    if not command:
        return False

    cmd = command.strip()
    parts = cmd.split()

    # Extract and validate environment variables
    env_vars = []
    for i, part in enumerate(parts):
        if "=" not in part:
            # Found the actual command (not an env var assignment)
            cmd = " ".join(parts[i:])
            break
        # Extract env var name before '='
        env_var = part.split("=")[0]
        env_vars.append(env_var)
    else:
        # All parts are env vars, no command
        return False

    # Check if any env var is not in whitelist
    for env_var in env_vars:
        if env_var not i
_is_within_workspace function · python · L78-L100 (23 LOC)
packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
def _is_within_workspace(file_path: str, workspace_dir: str | None) -> bool:
    """Check if file path is within the workspace directory.

    Args:
        file_path: The file path to validate.
        workspace_dir: The workspace directory path.

    Returns:
        True if file is within workspace, False otherwise.
    """
    if workspace_dir is None:
        return False

    try:
        from pathlib import Path

        file_resolved = Path(file_path).resolve()
        workspace_resolved = Path(workspace_dir).resolve()

        # Check if file_path is within workspace_dir
        return str(file_resolved).startswith(str(workspace_resolved))
    except Exception:
        return False
can_use_tool function · python · L103-L249 (147 LOC)
packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
async def can_use_tool(
    tool_name: str,
    input_params: dict[str, Any],
    context: ToolPermissionContext | dict[str, Any],
) -> PermissionResultAllow | PermissionResultDeny:
    """Validate tool usage with restrictions for Bash, Read, Write, and WebFetch.

    Args:
        tool_name: Name of the tool being invoked.
        input_params: Parameters passed to the tool.
        context: Permission context including workspace directory (dict or ToolPermissionContext).

    Returns:
        PermissionResultAllow if the tool usage is permitted.
        PermissionResultDeny if the tool usage should be blocked.
    """
    # Convert context to dict if needed
    context_dict = context if isinstance(context, dict) else {}

    # Restrict Bash to pitlane CLI only
    if tool_name == "Bash":
        command = input_params.get("command", "")

        if not _is_allowed_bash_command(command):
            denial_msg = (
                "Bash is restricted to 'pitlane' CLI commands only. "
 
is_tracing_enabled function · python · L40-L51 (12 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def is_tracing_enabled() -> bool:
    """Check if tracing is enabled via environment variable or programmatically.

    Returns:
        True if tracing is enabled, False otherwise.
        Programmatic setting (_tracing_enabled) overrides environment variable.
    """
    if _tracing_enabled is not None:
        # Programmatic setting overrides environment variable
        return _tracing_enabled
    # Fall back to environment variable
    return os.getenv("PITLANE_TRACING_ENABLED", "0") == "1"
_initialize_tracer_provider function · python · L54-L94 (41 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def _initialize_tracer_provider() -> None:
    """Initialize the OpenTelemetry tracer provider with console exporter.

    This is called automatically when tracing is enabled. It sets up a console
    exporter that outputs to stderr to avoid mixing with agent responses.

    The span processor can be configured via PITLANE_SPAN_PROCESSOR environment
    variable:
    - "simple" (default): Uses SimpleSpanProcessor for synchronous export.
      Required for tests to ensure spans are flushed before assertions.
    - "batch": Uses BatchSpanProcessor for better production performance.
      Spans are exported asynchronously in batches.
    """
    global _provider_initialized

    if _provider_initialized:
        return

    # Create resource with service name
    resource = Resource.create({"service.name": "pitlane-f1-agent"})

    # Create tracer provider
    provider = TracerProvider(resource=resource)

    # Create console exporter (outputs to stderr)
    console_exporter = ConsoleSpa
get_tracer function · python · L97-L121 (25 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def get_tracer() -> trace.Tracer:
    """Get the OpenTelemetry tracer instance.

    Lazily initializes the tracer provider if tracing is enabled.

    Note:
        This function has a benign race condition when called concurrently
        from multiple coroutines during initial startup. Multiple TracerProvider
        instances may be created, but this is harmless as OpenTelemetry's global
        tracer provider is set atomically.

    Returns:
        Tracer instance (may be a no-op tracer if tracing is disabled).
    """
    global _tracer

    if not is_tracing_enabled():
        # Return no-op tracer if tracing is disabled
        return trace.get_tracer(__name__)

    if _tracer is None:
        _initialize_tracer_provider()
        _tracer = trace.get_tracer(__name__, "0.1.0")

    return _tracer
enable_tracing function · python · L124-L130 (7 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def enable_tracing() -> None:
    """Programmatically enable tracing.

    This overrides the PITLANE_TRACING_ENABLED environment variable.
    """
    global _tracing_enabled
    _tracing_enabled = True
tool_span function · python · L140-L175 (36 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def tool_span(tool_name: str, **attributes: Any):
    """Create a span for a tool call with minimal console output.

    This context manager creates a span with the tool name and optional
    attributes. The console exporter will output a simple log line like:
    [TOOL] ToolName: key_parameter

    Args:
        tool_name: Name of the tool being called (e.g., "Bash", "Skill", "WebFetch").
        **attributes: Span attributes to record (e.g., tool.key_param, tool.permission).

    Example:
        with tool_span("Bash", **{"tool.key_param": "ls -la"}):
            # Execute tool
            pass
    """
    tracer = get_tracer()

    if not is_tracing_enabled():
        # If tracing is disabled, just yield without creating a span
        yield None
        return

    with tracer.start_as_current_span(f"tool.{tool_name}") as span:
        # Set tool name attribute
        span.set_attribute("tool.name", tool_name)

        # Set additional attributes
        for key, value in attribu
Repobility · MCP-ready · https://repobility.com
_log_tool_call function · python · L178-L200 (23 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def _log_tool_call(tool_name: str, attributes: dict[str, Any]) -> None:
    """Log a minimal tool call to stderr.

    Format: [TOOL] ToolName: key_parameter

    Args:
        tool_name: Name of the tool.
        attributes: Span attributes.
    """
    key_param = attributes.get("tool.key_param", "")
    permission = attributes.get("tool.permission", "")
    denial_reason = attributes.get("tool.denial_reason", "")

    # Build output message
    if permission == "denied":
        msg = f"[TOOL] {tool_name}: {key_param} → DENIED"
        if denial_reason:
            msg += f" ({denial_reason})"
    else:
        msg = f"[TOOL] {tool_name}: {key_param}"

    # Write to stderr
    print(msg, file=sys.stderr, flush=True)
log_permission_check function · python · L203-L221 (19 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
def log_permission_check(tool_name: str, allowed: bool, reason: str = "") -> None:
    """Log a permission check result.

    This is called from the permission callback to show when tools are
    denied access.

    Args:
        tool_name: Name of the tool.
        allowed: Whether the tool was allowed.
        reason: Reason for denial (if denied).
    """
    if not is_tracing_enabled():
        return

    if not allowed:
        msg = f"[PERMISSION] {tool_name} → DENIED"
        if reason:
            msg += f": {reason}"
        print(msg, file=sys.stderr, flush=True)
pre_tool_use_hook function · python · L227-L256 (30 LOC)
packages/pitlane-agent/src/pitlane_agent/tracing.py
async def pre_tool_use_hook(
    hook_input: PreToolUseHookInput,
    block_reason: str | None,
    hook_context: HookContext,
) -> SyncHookJSONOutput:
    """Hook called before a tool is executed.

    Logs the tool call to console if tracing is enabled.

    Args:
        hook_input: Input data including tool_name and tool_input.
        block_reason: Reason if the tool was blocked (unused).
        hook_context: Hook context (unused).

    Returns:
        SyncHookJSONOutput to continue execution.
    """
    if not is_tracing_enabled():
        return SyncHookJSONOutput(continue_=True)

    tool_name = hook_input["tool_name"]
    tool_input = hook_input["tool_input"]

    # Extract key parameter based on tool type
    key_param = _extract_key_param(tool_name, tool_input)

    # Log the tool call
    _log_tool_call(tool_name, {"tool.key_param": key_param})

    return SyncHookJSONOutput(continue_=True)
‹ prevpage 3 / 5next ›