Function bodies 217 total
_generate_title function · python · L449-L465 (17 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def _generate_title(message: str, max_length: int = 50) -> str:
"""Generate a title from the first message.
Args:
message: The user's first message.
max_length: Maximum title length.
Returns:
A truncated title suitable for display.
"""
# Remove extra whitespace
clean = " ".join(message.split())
if len(clean) <= max_length:
return clean
# Truncate at word boundary
truncated = clean[:max_length].rsplit(" ", 1)[0]
return truncated + "..."create_conversation function · python · L468-L504 (37 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def create_conversation(
workspace_id: str,
agent_session_id: str,
first_message: str,
) -> dict:
"""Create a new conversation entry.
Args:
workspace_id: Workspace identifier.
agent_session_id: Claude SDK session ID for resumption.
first_message: First user message (used for title/preview).
Returns:
The created conversation dict.
Raises:
ValueError: If workspace doesn't exist.
"""
conv_id = f"conv_{uuid.uuid4().hex[:12]}"
now = datetime.now(UTC).isoformat() + "Z"
conversation = {
"id": conv_id,
"agent_session_id": agent_session_id,
"title": _generate_title(first_message),
"created_at": now,
"last_message_at": now,
"message_count": 1,
"preview": first_message[:100] + ("..." if len(first_message) > 100 else ""),
}
data = load_conversations(workspace_id)
data["conversations"].insert(0, conversation) # Most recent first
data["actiupdate_conversation function · python · L507-L533 (27 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def update_conversation(
workspace_id: str,
conversation_id: str,
message_count_delta: int = 1,
) -> None:
"""Update conversation metadata after a message.
Args:
workspace_id: Workspace identifier.
conversation_id: The conversation to update.
message_count_delta: Number of messages to add to count.
"""
data = load_conversations(workspace_id)
found = False
for conv in data["conversations"]:
if conv["id"] == conversation_id:
conv["last_message_at"] = datetime.now(UTC).isoformat() + "Z"
conv["message_count"] += message_count_delta
found = True
break
if not found:
logger.warning(f"Conversation {conversation_id} not found in workspace {workspace_id}")
return
save_conversations(workspace_id, data)get_active_conversation function · python · L536-L554 (19 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def get_active_conversation(workspace_id: str) -> dict | None:
"""Get the currently active conversation for a workspace.
Args:
workspace_id: Workspace identifier.
Returns:
The active conversation dict, or None if no active conversation.
"""
data = load_conversations(workspace_id)
active_id = data.get("active_conversation_id")
if not active_id:
return None
for conv in data["conversations"]:
if conv["id"] == active_id:
return conv
return Noneset_active_conversation function · python · L557-L566 (10 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def set_active_conversation(workspace_id: str, conversation_id: str | None) -> None:
"""Set the active conversation for a workspace.
Args:
workspace_id: Workspace identifier.
conversation_id: The conversation ID to set as active, or None to clear.
"""
data = load_conversations(workspace_id)
data["active_conversation_id"] = conversation_id
save_conversations(workspace_id, data)get_messages_path function · python · L574-L584 (11 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def get_messages_path(workspace_id: str, conversation_id: str) -> Path:
"""Get path to the messages file for a specific conversation.
Args:
workspace_id: The workspace identifier.
conversation_id: The conversation identifier.
Returns:
Path to the {conversation_id}.messages.json file.
"""
return get_workspace_path(workspace_id) / f"{conversation_id}.messages.json"save_message function · python · L587-L634 (48 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def save_message(workspace_id: str, conversation_id: str, question: str, content: str) -> None:
"""Atomically append a message pair to the conversation's messages file.
Args:
workspace_id: Workspace identifier.
conversation_id: The conversation to append to.
question: The user's question.
content: The assistant's response content.
Raises:
ValueError: If workspace doesn't exist.
"""
if not workspace_exists(workspace_id):
raise ValueError(f"Workspace does not exist for workspace ID: {workspace_id}")
messages_path = get_messages_path(workspace_id, conversation_id)
workspace_path = get_workspace_path(workspace_id)
# Load existing messages
messages = []
if messages_path.exists():
try:
with open(messages_path) as f:
messages = json.load(f)
except (json.JSONDecodeError, OSError):
messages = []
messages.append(
{
"question"Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
load_messages function · python · L637-L655 (19 LOC)packages/pitlane-agent/src/pitlane_agent/commands/workspace/operations.py
def load_messages(workspace_id: str, conversation_id: str) -> list[dict]:
"""Load all message pairs for a conversation.
Args:
workspace_id: Workspace identifier.
conversation_id: The conversation to load messages for.
Returns:
List of message dicts with 'question', 'content', and 'timestamp' keys.
Returns empty list if no messages have been saved yet.
"""
messages_path = get_messages_path(workspace_id, conversation_id)
if not messages_path.exists():
return []
try:
with open(messages_path) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return []TemporalAnalyzer.analyze method · python · L25-L82 (58 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def analyze(self, current_time: datetime) -> TemporalContext:
"""Analyze schedule and determine current temporal state.
Args:
current_time: Current time (UTC)
Returns:
Complete temporal context
"""
# Ensure timezone awareness
if current_time.tzinfo is None:
current_time = current_time.replace(tzinfo=UTC)
# Determine season year
current_season = self._determine_season_year(current_time)
# Fetch schedule
try:
schedule = fastf1.get_event_schedule(current_season, include_testing=False)
except (ValueError, KeyError, AttributeError, ImportError):
# If current season schedule isn't available, try previous year
current_season = current_time.year - 1
schedule = fastf1.get_event_schedule(current_season, include_testing=False)
# Convert schedule to list of events
events = self._parse_schedule(schedule)
TemporalAnalyzer._determine_season_year method · python · L84-L107 (24 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _determine_season_year(self, current_time: datetime) -> int:
"""Determine which F1 season year to query.
Args:
current_time: Current time (UTC)
Returns:
Season year
"""
year = current_time.year
# If it's January or February, we might be in off-season
# Use previous year if current year schedule isn't available yet
if current_time.month <= 2:
try:
# Try current year first
fastf1.get_event_schedule(year, include_testing=False)
return year
except (ValueError, KeyError, AttributeError, ImportError):
# FastF1 schedule not available for current year yet
# Fall back to previous year
return year - 1
return yearTemporalAnalyzer._parse_schedule method · python · L109-L148 (40 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _parse_schedule(self, schedule: pd.DataFrame) -> list[dict]:
"""Parse FastF1 schedule DataFrame into list of event dicts.
Args:
schedule: FastF1 event schedule DataFrame
Returns:
List of parsed event dictionaries
"""
events = []
for _, event in schedule.iterrows():
# Parse sessions
sessions = []
for i in range(1, 6):
session_name = event.get(f"Session{i}")
session_date = event.get(f"Session{i}Date")
session_date_utc = event.get(f"Session{i}DateUtc")
if session_name and pd.notna(session_date):
sessions.append(
{
"name": session_name,
"date_local": session_date,
"date_utc": session_date_utc,
}
)
events.append(
TemporalAnalyzer._determine_season_phase method · python · L150-L192 (43 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _determine_season_phase(self, current_time: datetime, events: list[dict]) -> F1Season:
"""Determine current F1 season phase.
Args:
current_time: Current time (UTC)
events: List of race events
Returns:
Season phase
"""
if not events:
return F1Season.OFF_SEASON
# Get first and last race dates
first_race_date = events[0]["event_date"]
last_race_date = events[-1]["event_date"]
# Convert to datetime if needed
if isinstance(first_race_date, pd.Timestamp):
first_race_date = first_race_date.to_pydatetime()
if isinstance(last_race_date, pd.Timestamp):
last_race_date = last_race_date.to_pydatetime()
# Ensure timezone awareness
if first_race_date.tzinfo is None:
first_race_date = first_race_date.replace(tzinfo=UTC)
if last_race_date.tzinfo is None:
last_race_date = last_race_dateTemporalAnalyzer._find_current_weekend method · python · L194-L251 (58 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _find_current_weekend(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
"""Find active race weekend if any.
A weekend is considered "current" from Thursday 48h before first session
until 6 hours after the race.
Args:
current_time: Current time (UTC)
events: List of race events
Returns:
Current race weekend context or None
"""
for event in events:
sessions = event["sessions"]
if not sessions:
continue
# Parse session dates
parsed_sessions = []
for session in sessions:
date_utc = session["date_utc"]
date_local = session["date_local"]
if isinstance(date_utc, pd.Timestamp):
date_utc = date_utc.to_pydatetime()
if isinstance(date_local, pd.Timestamp):
date_local = date_local.to_pydaTemporalAnalyzer._find_last_completed_race method · python · L253-L275 (23 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _find_last_completed_race(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
"""Find most recently completed race.
Args:
current_time: Current time (UTC)
events: List of race events
Returns:
Last completed race context or None
"""
completed_events = []
for event in events:
if self._is_race_completed(event, current_time):
completed_events.append(event)
if not completed_events:
return None
# Get the most recent one
last_event = completed_events[-1]
sessions = self._parse_event_sessions(last_event)
return self._build_race_weekend_context(last_event, sessions, current_time, completed=True)TemporalAnalyzer._find_next_race method · python · L277-L298 (22 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _find_next_race(self, current_time: datetime, events: list[dict]) -> RaceWeekendContext | None:
"""Find next upcoming race weekend.
Args:
current_time: Current time (UTC)
events: List of race events
Returns:
Next race context or None
"""
for event in events:
event_date = event["event_date"]
if isinstance(event_date, pd.Timestamp):
event_date = event_date.to_pydatetime()
if event_date.tzinfo is None:
event_date = event_date.replace(tzinfo=UTC)
if event_date > current_time:
sessions = self._parse_event_sessions(event)
return self._build_race_weekend_context(event, sessions, current_time)
return NoneRepobility · MCP-ready · https://repobility.com
TemporalAnalyzer._is_race_completed method · python · L300-L328 (29 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _is_race_completed(self, event: dict, current_time: datetime) -> bool:
"""Check if a race is completed.
Args:
event: Event dictionary
current_time: Current time (UTC)
Returns:
True if race is completed
"""
# Find the race session (last session)
sessions = event.get("sessions", [])
if not sessions:
return False
last_session = sessions[-1]
race_date = last_session["date_utc"]
if isinstance(race_date, pd.Timestamp):
race_date = race_date.to_pydatetime()
if race_date.tzinfo is None:
race_date = race_date.replace(tzinfo=UTC)
# Race is completed if it's more than 4 hours in the past
# Extended from 3 to 4 hours to handle edge cases:
# - Red-flagged races (e.g., Belgium 2021, Monaco 2011)
# - Very long races with multiple safety car periods
# - Post-race ceremonies and interviews
TemporalAnalyzer._parse_event_sessions method · python · L330-L366 (37 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _parse_event_sessions(self, event: dict) -> list[dict]:
"""Parse sessions for an event with timezone handling.
Args:
event: Event dictionary
Returns:
List of parsed session dictionaries
"""
parsed_sessions = []
for session in event.get("sessions", []):
date_utc = session["date_utc"]
date_local = session["date_local"]
if isinstance(date_utc, pd.Timestamp):
date_utc = date_utc.to_pydatetime()
if isinstance(date_local, pd.Timestamp):
date_local = date_local.to_pydatetime()
# Ensure UTC times have timezone info
if date_utc.tzinfo is None:
date_utc = date_utc.replace(tzinfo=UTC)
# Keep local times as naive datetimes (no timezone)
# They represent the local time at the circuit
if date_local.tzinfo is not None:
date_local = date_local.replaTemporalAnalyzer._build_race_weekend_context method · python · L368-L465 (98 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _build_race_weekend_context(
self, event: dict, sessions: list[dict], current_time: datetime, completed: bool = False
) -> RaceWeekendContext:
"""Build race weekend context from event data.
Args:
event: Event dictionary
sessions: Parsed sessions
current_time: Current time (UTC)
completed: Whether the race is completed
Returns:
Race weekend context
"""
# Determine session type mapping
session_type_map = {
"Practice 1": "FP1",
"Practice 2": "FP2",
"Practice 3": "FP3",
"Qualifying": "Q",
"Sprint Qualifying": "SQ",
"Sprint": "S",
"Race": "R",
}
# Build session contexts
session_contexts = []
current_session = None
next_session = None
for session in sessions:
session_type = session_type_map.get(session["name"], "UNKNOWN"TemporalAnalyzer._determine_weekend_phase method · python · L467-L506 (40 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _determine_weekend_phase(
self, sessions: list[SessionContext], current_time: datetime, completed: bool
) -> RaceWeekendPhase:
"""Determine current phase of the race weekend.
Args:
sessions: List of session contexts
current_time: Current time (UTC)
completed: Whether the race is completed
Returns:
Race weekend phase
"""
if completed:
return RaceWeekendPhase.POST_RACE
# Check for live or recent sessions
for session in sessions:
if session.is_live:
if "Sprint" in session.name:
return RaceWeekendPhase.SPRINT
elif "Qualifying" in session.name:
return RaceWeekendPhase.QUALIFYING
elif "Race" in session.name:
return RaceWeekendPhase.RACE
elif "Practice" in session.name:
return RaceWeekendPhase.PRACTICE
TemporalAnalyzer._compute_ttl method · python · L508-L548 (41 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/analyzer.py
def _compute_ttl(self, season_phase: F1Season, current_weekend: RaceWeekendContext | None) -> int:
"""Compute cache TTL based on temporal state.
Args:
season_phase: Current season phase
current_weekend: Current race weekend (if any)
Returns:
TTL in seconds
"""
# Off-season: 7 days
if season_phase == F1Season.OFF_SEASON:
return 7 * 24 * 60 * 60
# Pre-season: 3 days
if season_phase == F1Season.PRE_SEASON:
return 3 * 24 * 60 * 60
# Post-season: 5 days
if season_phase == F1Season.POST_SEASON:
return 5 * 24 * 60 * 60
# During race weekend
if current_weekend:
# Check if any session is live
if current_weekend.current_session and current_weekend.current_session.is_live:
return 5 * 60 # 5 minutes during live session
# Check if next session is soon (within 2 hours)
TemporalCache.__init__ method · python · L19-L30 (12 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def __init__(self, cache_dir: Path | None = None):
"""Initialize cache.
Args:
cache_dir: Cache directory (defaults to ~/.pitlane/cache/temporal)
"""
if cache_dir is None:
cache_dir = Path.home() / ".pitlane" / "cache" / "temporal"
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.cache_file = self.cache_dir / "context_cache.json"TemporalCache.get method · python · L32-L65 (34 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def get(self, current_time: datetime) -> TemporalContext | None:
"""Retrieve cached context if valid.
Args:
current_time: Current time (UTC, must be timezone-aware)
Returns:
Cached context if valid, None otherwise
Raises:
ValueError: If current_time is not timezone-aware
"""
if current_time.tzinfo is None:
raise ValueError("current_time must be timezone-aware (include tzinfo)")
if not self.cache_file.exists():
return None
try:
with open(self.cache_file) as f:
data = json.load(f)
# Check if cache is expired
expires_at = datetime.fromisoformat(data["expires_at"])
if current_time > expires_at:
return None
# Deserialize context
context = self._deserialize_context(data["context"])
return context
except (json.JSONDecodeError, KeyError, TemporalCache.set method · python · L67-L88 (22 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def set(self, context: TemporalContext) -> None:
"""Cache context with TTL.
Args:
context: Temporal context to cache
"""
# Calculate expiration time
from datetime import timedelta
expires_at = context.cache_timestamp + timedelta(seconds=context.ttl_seconds)
# Serialize to JSON
cache_data = {
"timestamp": context.cache_timestamp.isoformat(),
"ttl_seconds": context.ttl_seconds,
"expires_at": expires_at.isoformat(),
"context": context.to_dict(),
}
# Write to file
with open(self.cache_file, "w") as f:
json.dump(cache_data, f, indent=2)Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
TemporalCache._deserialize_context method · python · L95-L118 (24 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def _deserialize_context(self, data: dict) -> TemporalContext:
"""Deserialize context from dictionary.
Args:
data: Serialized context dictionary
Returns:
Temporal context
"""
return TemporalContext(
current_time_utc=datetime.fromisoformat(data["current_time_utc"]),
current_season=data["current_season"],
season_phase=F1Season(data["season_phase"]),
current_weekend=self._deserialize_weekend(data["current_weekend"]) if data["current_weekend"] else None,
last_completed_race=(
self._deserialize_weekend(data["last_completed_race"]) if data["last_completed_race"] else None
),
next_race=self._deserialize_weekend(data["next_race"]) if data["next_race"] else None,
races_completed=data["races_completed"],
races_remaining=data["races_remaining"],
days_until_next_race=data["days_until_next_race"]TemporalCache._deserialize_weekend method · python · L120-L140 (21 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def _deserialize_weekend(self, data: dict) -> RaceWeekendContext:
"""Deserialize race weekend context.
Args:
data: Serialized weekend dictionary
Returns:
Race weekend context
"""
return RaceWeekendContext(
round_number=data["round_number"],
event_name=data["event_name"],
country=data["country"],
location=data["location"],
event_date=datetime.fromisoformat(data["event_date"]),
phase=RaceWeekendPhase(data["phase"]),
current_session=self._deserialize_session(data["current_session"]) if data["current_session"] else None,
next_session=self._deserialize_session(data["next_session"]) if data["next_session"] else None,
all_sessions=[self._deserialize_session(s) for s in data["all_sessions"]],
is_sprint_weekend=data["is_sprint_weekend"],
)TemporalCache._deserialize_session method · python · L142-L160 (19 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/cache.py
def _deserialize_session(self, data: dict) -> SessionContext:
"""Deserialize session context.
Args:
data: Serialized session dictionary
Returns:
Session context
"""
return SessionContext(
name=data["name"],
session_type=data["session_type"],
date_utc=datetime.fromisoformat(data["date_utc"]),
date_local=datetime.fromisoformat(data["date_local"]),
is_live=data["is_live"],
is_recent=data["is_recent"],
minutes_until=data["minutes_until"],
minutes_since=data["minutes_since"],
)SessionContext.to_dict method · python · L42-L53 (12 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def to_dict(self) -> dict:
"""Convert to dictionary with ISO format dates."""
return {
"name": self.name,
"session_type": self.session_type,
"date_utc": self.date_utc.isoformat(),
"date_local": self.date_local.isoformat(),
"is_live": self.is_live,
"is_recent": self.is_recent,
"minutes_until": self.minutes_until,
"minutes_since": self.minutes_since,
}RaceWeekendContext.to_dict method · python · L71-L84 (14 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"round_number": self.round_number,
"event_name": self.event_name,
"country": self.country,
"location": self.location,
"event_date": self.event_date.isoformat(),
"phase": self.phase.value,
"current_session": self.current_session.to_dict() if self.current_session else None,
"next_session": self.next_session.to_dict() if self.next_session else None,
"all_sessions": [s.to_dict() for s in self.all_sessions],
"is_sprint_weekend": self.is_sprint_weekend,
}TemporalContext.to_dict method · python · L113-L127 (15 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"current_time_utc": self.current_time_utc.isoformat(),
"current_season": self.current_season,
"season_phase": self.season_phase.value,
"current_weekend": self.current_weekend.to_dict() if self.current_weekend else None,
"last_completed_race": self.last_completed_race.to_dict() if self.last_completed_race else None,
"next_race": self.next_race.to_dict() if self.next_race else None,
"races_completed": self.races_completed,
"races_remaining": self.races_remaining,
"days_until_next_race": self.days_until_next_race,
"cache_timestamp": self.cache_timestamp.isoformat(),
"ttl_seconds": self.ttl_seconds,
}TemporalContextManager.__init__ method · python · L133-L143 (11 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def __init__(self, cache_dir: Path | None = None):
"""Initialize temporal context manager.
Args:
cache_dir: Override cache directory (defaults to ~/.pitlane/cache/temporal)
"""
from pitlane_agent.temporal.analyzer import TemporalAnalyzer
from pitlane_agent.temporal.cache import TemporalCache
self.cache = TemporalCache(cache_dir)
self.analyzer = TemporalAnalyzer()TemporalContextManager.get_context method · python · L145-L169 (25 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def get_context(self, force_refresh: bool = False, current_time: datetime | None = None) -> TemporalContext:
"""Get current temporal context.
Args:
force_refresh: Force fetch from FastF1 (ignore cache)
current_time: Override current time (for testing)
Returns:
Complete temporal context
"""
now = current_time or datetime.now(UTC)
# Check cache
if not force_refresh:
cached = self.cache.get(now)
if cached:
return cached
# Fetch and analyze
context = self.analyzer.analyze(now)
# Cache with intelligent TTL
self.cache.set(context)
return contextGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
get_temporal_context function · python · L176-L197 (22 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/context.py
def get_temporal_context(force_refresh: bool = False, current_time: datetime | None = None) -> TemporalContext:
"""Get current F1 temporal context.
This is the primary public API for accessing temporal context.
Args:
force_refresh: Force fetch from FastF1 (ignore cache)
current_time: Override current time (for testing)
Returns:
Complete temporal context
Example:
>>> context = get_temporal_context()
>>> print(f"Current season: {context.current_season}")
>>> if context.current_weekend:
>>> print(f"Race weekend: {context.current_weekend.event_name}")
"""
global _manager
if _manager is None:
_manager = TemporalContextManager()
return _manager.get_context(force_refresh=force_refresh, current_time=current_time)format_for_system_prompt function · python · L10-L25 (16 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def format_for_system_prompt(context: TemporalContext, verbosity: str = "normal") -> str:
"""Format temporal context for inclusion in system prompt.
Args:
context: Temporal context
verbosity: "minimal", "normal", or "detailed"
Returns:
Formatted string for system prompt
"""
if verbosity == "minimal":
return _format_minimal(context)
elif verbosity == "detailed":
return _format_detailed(context)
else:
return _format_normal(context)_format_minimal function · python · L28-L57 (30 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_minimal(context: TemporalContext) -> str:
"""Minimal format for system prompt.
Args:
context: Temporal context
Returns:
Formatted string
"""
lines = [
"## F1 Temporal Context",
"",
f"**Season:** {context.current_season} ({context.season_phase.value.replace('_', ' ').title()})",
]
if context.next_race:
lines.append(
f"**Next Race:** {context.next_race.event_name} (Round {context.next_race.round_number}) - "
f"{context.next_race.event_date.strftime('%B %d, %Y')}"
)
if context.days_until_next_race is not None:
lines.append(f" ({context.days_until_next_race} days)")
if context.last_completed_race:
lines.append(
f"**Last Race:** {context.last_completed_race.event_name} - "
f"Completed {context.last_completed_race.event_date.strftime('%B %d, %Y')}"
)
return "\n".join(lines)_format_normal function · python · L60-L137 (78 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_normal(context: TemporalContext) -> str:
"""Normal format for system prompt.
Args:
context: Temporal context
Returns:
Formatted string
"""
lines = ["## F1 Temporal Context", ""]
# Current season and phase
lines.append(f"**Current Season:** {context.current_season}")
phase_display = context.season_phase.value.replace("_", " ").title()
if context.season_phase == F1Season.IN_SEASON:
lines.append(
f"**Phase:** {phase_display} "
f"(Round {context.races_completed} of {context.races_completed + context.races_remaining} completed)"
)
else:
lines.append(f"**Phase:** {phase_display}")
lines.append("")
# Current race weekend (if any)
if context.current_weekend:
lines.extend(_format_current_weekend(context.current_weekend))
lines.append("")
# Next race
if context.next_race and not context.current_weekend:
lines.append(f"**Next Race:*_format_detailed function · python · L140-L205 (66 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_detailed(context: TemporalContext) -> str:
"""Detailed format for system prompt.
Args:
context: Temporal context
Returns:
Formatted string
"""
lines = ["## F1 Temporal Context", ""]
# Current season and phase
lines.append(f"**Current Season:** {context.current_season}")
phase_display = context.season_phase.value.replace("_", " ").title()
if context.season_phase == F1Season.IN_SEASON:
lines.append(
f"**Phase:** {phase_display} "
f"(Round {context.races_completed} of {context.races_completed + context.races_remaining})"
)
else:
lines.append(f"**Phase:** {phase_display}")
lines.append("")
# Current race weekend (if any)
if context.current_weekend:
lines.extend(_format_current_weekend_detailed(context.current_weekend, context.current_time_utc))
lines.append("")
# Next race (if not current weekend)
if context.next_race and not context_format_current_weekend function · python · L208-L260 (53 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_current_weekend(weekend: "RaceWeekendContext") -> list[str]:
"""Format current race weekend for normal verbosity.
Args:
weekend: Race weekend context
Returns:
List of formatted lines
"""
lines = [f"**ACTIVE RACE WEEKEND: {weekend.event_name}**"]
lines.append(f"- Round {weekend.round_number} in {weekend.location}, {weekend.country}")
if weekend.is_sprint_weekend:
lines.append("- Event Format: Sprint weekend")
# Current session
if weekend.current_session:
lines.append("")
if weekend.current_session.is_live:
lines.append(f"**Current Session:** {weekend.current_session.name} ⚡ LIVE")
if weekend.current_session.minutes_since:
lines.append(f"- Started {weekend.current_session.minutes_since} minutes ago")
else:
lines.append(f"**Recent Session:** {weekend.current_session.name}")
if weekend.current_session.minutes_since:
_format_current_weekend_detailed function · python · L263-L325 (63 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def _format_current_weekend_detailed(weekend: "RaceWeekendContext", current_time) -> list[str]:
"""Format current race weekend for detailed verbosity.
Args:
weekend: Race weekend context
current_time: Current time (UTC)
Returns:
List of formatted lines
"""
lines = [f"**ACTIVE RACE WEEKEND: {weekend.event_name}**"]
lines.append(f"- Round {weekend.round_number} in {weekend.location}, {weekend.country}")
lines.append(f"- Event Date: {weekend.event_date.strftime('%B %d, %Y')}")
if weekend.is_sprint_weekend:
lines.append("- Event Format: Sprint weekend")
else:
lines.append("- Event Format: Conventional weekend")
# Current session
if weekend.current_session:
lines.append("")
if weekend.current_session.is_live:
lines.append(f"**⚡ LIVE SESSION: {weekend.current_session.name}**")
if weekend.current_session.minutes_since:
lines.append(f"- Started {weekenformat_as_text function · python · L328-L412 (85 LOC)packages/pitlane-agent/src/pitlane_agent/temporal/formatter.py
def format_as_text(context: TemporalContext) -> str:
"""Format temporal context as human-readable text (for CLI display).
Args:
context: Temporal context
Returns:
Formatted text
"""
lines = ["=" * 60]
lines.append("F1 TEMPORAL CONTEXT")
lines.append("=" * 60)
lines.append("")
# Basic info
lines.append(f"Current Time (UTC): {context.current_time_utc.strftime('%Y-%m-%d %H:%M:%S')}")
lines.append(f"Season: {context.current_season}")
lines.append(f"Phase: {context.season_phase.value.replace('_', ' ').title()}")
lines.append(f"Races Completed: {context.races_completed}")
lines.append(f"Races Remaining: {context.races_remaining}")
lines.append("")
# Current weekend
if context.current_weekend:
lines.append("-" * 60)
lines.append("CURRENT RACE WEEKEND")
lines.append("-" * 60)
lines.append(f"Event: {context.current_weekend.event_name}")
lines.append(f"Round: {context.cProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
_is_allowed_bash_command function · python · L37-L75 (39 LOC)packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
def _is_allowed_bash_command(command: str) -> bool:
"""Check if Bash command is allowed (pitlane CLI only).
Validates that:
1. Only whitelisted environment variables are used
2. The command starts with "pitlane "
Args:
command: The bash command to validate.
Returns:
True if command is allowed, False otherwise.
"""
if not command:
return False
cmd = command.strip()
parts = cmd.split()
# Extract and validate environment variables
env_vars = []
for i, part in enumerate(parts):
if "=" not in part:
# Found the actual command (not an env var assignment)
cmd = " ".join(parts[i:])
break
# Extract env var name before '='
env_var = part.split("=")[0]
env_vars.append(env_var)
else:
# All parts are env vars, no command
return False
# Check if any env var is not in whitelist
for env_var in env_vars:
if env_var not i_is_within_workspace function · python · L78-L100 (23 LOC)packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
def _is_within_workspace(file_path: str, workspace_dir: str | None) -> bool:
"""Check if file path is within the workspace directory.
Args:
file_path: The file path to validate.
workspace_dir: The workspace directory path.
Returns:
True if file is within workspace, False otherwise.
"""
if workspace_dir is None:
return False
try:
from pathlib import Path
file_resolved = Path(file_path).resolve()
workspace_resolved = Path(workspace_dir).resolve()
# Check if file_path is within workspace_dir
return str(file_resolved).startswith(str(workspace_resolved))
except Exception:
return Falsecan_use_tool function · python · L103-L249 (147 LOC)packages/pitlane-agent/src/pitlane_agent/tool_permissions.py
async def can_use_tool(
tool_name: str,
input_params: dict[str, Any],
context: ToolPermissionContext | dict[str, Any],
) -> PermissionResultAllow | PermissionResultDeny:
"""Validate tool usage with restrictions for Bash, Read, Write, and WebFetch.
Args:
tool_name: Name of the tool being invoked.
input_params: Parameters passed to the tool.
context: Permission context including workspace directory (dict or ToolPermissionContext).
Returns:
PermissionResultAllow if the tool usage is permitted.
PermissionResultDeny if the tool usage should be blocked.
"""
# Convert context to dict if needed
context_dict = context if isinstance(context, dict) else {}
# Restrict Bash to pitlane CLI only
if tool_name == "Bash":
command = input_params.get("command", "")
if not _is_allowed_bash_command(command):
denial_msg = (
"Bash is restricted to 'pitlane' CLI commands only. "
is_tracing_enabled function · python · L40-L51 (12 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def is_tracing_enabled() -> bool:
"""Check if tracing is enabled via environment variable or programmatically.
Returns:
True if tracing is enabled, False otherwise.
Programmatic setting (_tracing_enabled) overrides environment variable.
"""
if _tracing_enabled is not None:
# Programmatic setting overrides environment variable
return _tracing_enabled
# Fall back to environment variable
return os.getenv("PITLANE_TRACING_ENABLED", "0") == "1"_initialize_tracer_provider function · python · L54-L94 (41 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def _initialize_tracer_provider() -> None:
"""Initialize the OpenTelemetry tracer provider with console exporter.
This is called automatically when tracing is enabled. It sets up a console
exporter that outputs to stderr to avoid mixing with agent responses.
The span processor can be configured via PITLANE_SPAN_PROCESSOR environment
variable:
- "simple" (default): Uses SimpleSpanProcessor for synchronous export.
Required for tests to ensure spans are flushed before assertions.
- "batch": Uses BatchSpanProcessor for better production performance.
Spans are exported asynchronously in batches.
"""
global _provider_initialized
if _provider_initialized:
return
# Create resource with service name
resource = Resource.create({"service.name": "pitlane-f1-agent"})
# Create tracer provider
provider = TracerProvider(resource=resource)
# Create console exporter (outputs to stderr)
console_exporter = ConsoleSpaget_tracer function · python · L97-L121 (25 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def get_tracer() -> trace.Tracer:
"""Get the OpenTelemetry tracer instance.
Lazily initializes the tracer provider if tracing is enabled.
Note:
This function has a benign race condition when called concurrently
from multiple coroutines during initial startup. Multiple TracerProvider
instances may be created, but this is harmless as OpenTelemetry's global
tracer provider is set atomically.
Returns:
Tracer instance (may be a no-op tracer if tracing is disabled).
"""
global _tracer
if not is_tracing_enabled():
# Return no-op tracer if tracing is disabled
return trace.get_tracer(__name__)
if _tracer is None:
_initialize_tracer_provider()
_tracer = trace.get_tracer(__name__, "0.1.0")
return _tracerenable_tracing function · python · L124-L130 (7 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def enable_tracing() -> None:
"""Programmatically enable tracing.
This overrides the PITLANE_TRACING_ENABLED environment variable.
"""
global _tracing_enabled
_tracing_enabled = Truetool_span function · python · L140-L175 (36 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def tool_span(tool_name: str, **attributes: Any):
"""Create a span for a tool call with minimal console output.
This context manager creates a span with the tool name and optional
attributes. The console exporter will output a simple log line like:
[TOOL] ToolName: key_parameter
Args:
tool_name: Name of the tool being called (e.g., "Bash", "Skill", "WebFetch").
**attributes: Span attributes to record (e.g., tool.key_param, tool.permission).
Example:
with tool_span("Bash", **{"tool.key_param": "ls -la"}):
# Execute tool
pass
"""
tracer = get_tracer()
if not is_tracing_enabled():
# If tracing is disabled, just yield without creating a span
yield None
return
with tracer.start_as_current_span(f"tool.{tool_name}") as span:
# Set tool name attribute
span.set_attribute("tool.name", tool_name)
# Set additional attributes
for key, value in attribuRepobility · MCP-ready · https://repobility.com
_log_tool_call function · python · L178-L200 (23 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def _log_tool_call(tool_name: str, attributes: dict[str, Any]) -> None:
"""Log a minimal tool call to stderr.
Format: [TOOL] ToolName: key_parameter
Args:
tool_name: Name of the tool.
attributes: Span attributes.
"""
key_param = attributes.get("tool.key_param", "")
permission = attributes.get("tool.permission", "")
denial_reason = attributes.get("tool.denial_reason", "")
# Build output message
if permission == "denied":
msg = f"[TOOL] {tool_name}: {key_param} → DENIED"
if denial_reason:
msg += f" ({denial_reason})"
else:
msg = f"[TOOL] {tool_name}: {key_param}"
# Write to stderr
print(msg, file=sys.stderr, flush=True)log_permission_check function · python · L203-L221 (19 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
def log_permission_check(tool_name: str, allowed: bool, reason: str = "") -> None:
"""Log a permission check result.
This is called from the permission callback to show when tools are
denied access.
Args:
tool_name: Name of the tool.
allowed: Whether the tool was allowed.
reason: Reason for denial (if denied).
"""
if not is_tracing_enabled():
return
if not allowed:
msg = f"[PERMISSION] {tool_name} → DENIED"
if reason:
msg += f": {reason}"
print(msg, file=sys.stderr, flush=True)pre_tool_use_hook function · python · L227-L256 (30 LOC)packages/pitlane-agent/src/pitlane_agent/tracing.py
async def pre_tool_use_hook(
hook_input: PreToolUseHookInput,
block_reason: str | None,
hook_context: HookContext,
) -> SyncHookJSONOutput:
"""Hook called before a tool is executed.
Logs the tool call to console if tracing is enabled.
Args:
hook_input: Input data including tool_name and tool_input.
block_reason: Reason if the tool was blocked (unused).
hook_context: Hook context (unused).
Returns:
SyncHookJSONOutput to continue execution.
"""
if not is_tracing_enabled():
return SyncHookJSONOutput(continue_=True)
tool_name = hook_input["tool_name"]
tool_input = hook_input["tool_input"]
# Extract key parameter based on tool type
key_param = _extract_key_param(tool_name, tool_input)
# Log the tool call
_log_tool_call(tool_name, {"tool.key_param": key_param})
return SyncHookJSONOutput(continue_=True)