Function bodies 59 total
Direction class · python · L15-L25 (11 LOC)crossroads/core/intersection.py
class Direction(StrEnum):
"""Cardinal directions for a 4-way intersection.
Values match the string keys used to access lights and vehicles
via ``intersection.lights['North']`` etc.
"""
NORTH = "North"
SOUTH = "South"
EAST = "East"
WEST = "West"IntersectionConfig class · python · L29-L50 (22 LOC)crossroads/core/intersection.py
class IntersectionConfig:
"""Configuration for intersection timing and collision detection.
Attributes:
green_duration: Duration of the GREEN phase in seconds.
yellow_duration: Duration of the YELLOW phase in seconds.
all_red_duration: Duration of the all-RED safety clearance in seconds.
vehicle_width: Minimum safe distance between vehicles in meters.
"""
green_duration: float = 30.0
yellow_duration: float = 5.0
all_red_duration: float = 2.0
vehicle_width: float = 2.0
stop_line_distance: float = 100.0
def __post_init__(self) -> None:
"""Validate that all durations are non-negative."""
for name in ("green_duration", "yellow_duration", "all_red_duration", "vehicle_width", "stop_line_distance"):
value = getattr(self, name)
if value < 0:
raise ValueError(f"{name} must be >= 0, got {value}")__post_init__ method · python · L45-L50 (6 LOC)crossroads/core/intersection.py
def __post_init__(self) -> None:
"""Validate that all durations are non-negative."""
for name in ("green_duration", "yellow_duration", "all_red_duration", "vehicle_width", "stop_line_distance"):
value = getattr(self, name)
if value < 0:
raise ValueError(f"{name} must be >= 0, got {value}")Intersection class · python · L53-L233 (181 LOC)crossroads/core/intersection.py
class Intersection:
"""4-way intersection coordinating traffic lights and vehicles.
Manages four traffic lights (North, South, East, West) with
coordinated phases so that N/S and E/W never have GREEN simultaneously.
Vehicles are registered as observers on their direction's traffic light
and receive phase-change notifications automatically.
Args:
config: Intersection timing configuration. Defaults to
``IntersectionConfig()`` if ``None``.
Example:
>>> intersection = Intersection()
>>> intersection.lights['North'].phase
<TrafficLightPhase.GREEN: 'GREEN'>
>>> intersection.lights['East'].phase
<TrafficLightPhase.RED: 'RED'>
"""
def __init__(self, config: IntersectionConfig | None = None) -> None:
self._config = config if config is not None else IntersectionConfig()
# Calculate red_duration so that opposing pair is red while the other
# pair completes green + yellow + __init__ method · python · L73-L105 (33 LOC)crossroads/core/intersection.py
def __init__(self, config: IntersectionConfig | None = None) -> None:
self._config = config if config is not None else IntersectionConfig()
# Calculate red_duration so that opposing pair is red while the other
# pair completes green + yellow + all_red
red_duration = (
self._config.green_duration
+ self._config.yellow_duration
+ self._config.all_red_duration
)
light_config = TrafficLightConfig(
red_duration=red_duration,
green_duration=self._config.green_duration,
yellow_duration=self._config.yellow_duration,
)
# Create all 4 lights — all start at RED by default
self._lights: dict[str, TrafficLight] = {
d.value: TrafficLight(light_config) for d in Direction
}
# Pre-advance N/S lights past their initial RED phase so they start GREEN
# No vehicles registered yet, so observer notifications are safe (no-opadd_vehicle method · python · L127-L147 (21 LOC)crossroads/core/intersection.py
def add_vehicle(self, vehicle: Vehicle, direction: str) -> None:
"""Add a vehicle to the intersection for a given direction.
Registers the vehicle as an observer on the corresponding traffic
light so it receives phase-change notifications.
Args:
vehicle: The vehicle to add.
direction: Direction name (e.g. "North", "South", "East", "West").
Raises:
ValueError: If direction is not a valid direction name.
"""
if direction not in self._vehicles:
raise ValueError(
f"Invalid direction '{direction}'. "
f"Must be one of: {list(self._vehicles.keys())}"
)
self._vehicles[direction].append(vehicle)
self._lights[direction].register_observer(vehicle.on_signal_change)
vehicle.stop_distance = self._config.stop_line_distanceremove_vehicle method · python · L149-L168 (20 LOC)crossroads/core/intersection.py
def remove_vehicle(self, vehicle: Vehicle, direction: str) -> None:
"""Remove a vehicle from the intersection.
Note: Observer deregistration is not performed since TrafficLight
does not support it. The observer list only grows, which is
acceptable for simulation scope.
Args:
vehicle: The vehicle to remove.
direction: Direction name (e.g. "North", "South", "East", "West").
Raises:
ValueError: If direction is not a valid direction name.
"""
if direction not in self._vehicles:
raise ValueError(
f"Invalid direction '{direction}'. "
f"Must be one of: {list(self._vehicles.keys())}"
)
self._vehicles[direction].remove(vehicle)Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
update method · python · L170-L202 (33 LOC)crossroads/core/intersection.py
def update(self, dt: float) -> None:
"""Advance the intersection simulation by one timestep.
Updates all traffic lights, assigns leader vehicles within each
direction, updates all vehicle physics, and runs collision detection.
Args:
dt: Time delta in seconds since the last update.
"""
# Update all traffic lights
for light in self._lights.values():
light.update(dt)
# Per direction: assign leaders then update vehicles
for direction_vehicles in self._vehicles.values():
if not direction_vehicles:
continue
# Sort by position ascending (rear to front)
sorted_vehicles = sorted(direction_vehicles, key=lambda v: v.position)
# Assign leaders: each vehicle follows the one ahead
for i in range(len(sorted_vehicles) - 1):
sorted_vehicles[i].leader = sorted_vehicles[i + 1]
# Frontmost vehicle ha_check_collisions method · python · L204-L233 (30 LOC)crossroads/core/intersection.py
def _check_collisions(self) -> None:
"""Check for collisions between consecutive vehicles per direction.
Vehicles closer than ``vehicle_width`` in the same direction are
recorded as collision pairs. This is a diagnostic tool — IDM
physics should prevent collisions proactively.
"""
vehicle_width = self._config.vehicle_width
self._collisions.clear()
for direction, direction_vehicles in self._vehicles.items():
if len(direction_vehicles) < 2:
continue
sorted_vehicles = sorted(direction_vehicles, key=lambda v: v.position)
for i in range(len(sorted_vehicles) - 1):
rear = sorted_vehicles[i]
front = sorted_vehicles[i + 1]
gap = front.position - rear.position
if gap < vehicle_width:
self._collisions.append((rear, front))
self._collision_count += 1
loTrafficLightPhase class · python · L17-L27 (11 LOC)crossroads/core/traffic_light.py
class TrafficLightPhase(StrEnum):
"""Phases of a traffic light signal.
Uses explicit string values so that ``TrafficLightPhase.RED == "RED"``
evaluates to ``True``, ensuring compatibility with string-based phase
references throughout the codebase.
"""
RED = "RED"
GREEN = "GREEN"
YELLOW = "YELLOW"TrafficLightConfig class · python · L39-L57 (19 LOC)crossroads/core/traffic_light.py
class TrafficLightConfig:
"""Configuration for traffic light phase durations.
Attributes:
red_duration: Duration of the RED phase in seconds.
green_duration: Duration of the GREEN phase in seconds.
yellow_duration: Duration of the YELLOW phase in seconds.
"""
red_duration: float = 30.0
green_duration: float = 30.0
yellow_duration: float = 5.0
def __post_init__(self) -> None:
"""Validate that all durations are non-negative."""
for name in ("red_duration", "green_duration", "yellow_duration"):
value = getattr(self, name)
if value < 0:
raise ValueError(f"{name} must be >= 0, got {value}")__post_init__ method · python · L52-L57 (6 LOC)crossroads/core/traffic_light.py
def __post_init__(self) -> None:
"""Validate that all durations are non-negative."""
for name in ("red_duration", "green_duration", "yellow_duration"):
value = getattr(self, name)
if value < 0:
raise ValueError(f"{name} must be >= 0, got {value}")TrafficLight class · python · L60-L160 (101 LOC)crossroads/core/traffic_light.py
class TrafficLight:
"""Time-driven finite state machine for a single traffic light.
The traffic light cycles through RED -> GREEN -> YELLOW -> RED
with configurable durations. External code drives the FSM by
calling ``update(dt)`` each simulation timestep.
Observers can register callbacks to be notified on phase changes.
Args:
config: Phase duration configuration. Defaults to
``TrafficLightConfig()`` if ``None``.
Example:
>>> light = TrafficLight()
>>> light.phase
<TrafficLightPhase.RED: 'RED'>
>>> light.update(30.0) # Exceed red_duration
>>> light.phase
<TrafficLightPhase.GREEN: 'GREEN'>
"""
def __init__(self, config: TrafficLightConfig | None = None) -> None:
self._config = config if config is not None else TrafficLightConfig()
self._phase: TrafficLightPhase = TrafficLightPhase.RED
self._elapsed: float = 0.0
self._observers: list[Callable[[Traf__init__ method · python · L82-L86 (5 LOC)crossroads/core/traffic_light.py
def __init__(self, config: TrafficLightConfig | None = None) -> None:
self._config = config if config is not None else TrafficLightConfig()
self._phase: TrafficLightPhase = TrafficLightPhase.RED
self._elapsed: float = 0.0
self._observers: list[Callable[[TrafficLight, TrafficLightPhase], None]] = []register_observer method · python · L93-L102 (10 LOC)crossroads/core/traffic_light.py
def register_observer(
self, callback: Callable[[TrafficLight, TrafficLightPhase], None],
) -> None:
"""Register an observer to be notified on phase changes.
Args:
callback: A callable invoked as ``callback(light, new_phase)``
whenever the traffic light transitions to a new phase.
"""
self._observers.append(callback)All rows scored by the Repobility analyzer (https://repobility.com)
update method · python · L104-L130 (27 LOC)crossroads/core/traffic_light.py
def update(self, dt: float) -> None:
"""Advance the traffic light by *dt* seconds.
Accumulates elapsed time and triggers phase transitions when the
current phase's configured duration is exceeded. Overflow time
carries over into the next phase.
A per-call transition cap (equal to the number of phases) prevents
infinite loops when a phase duration is zero.
Args:
dt: Time delta in seconds since the last update.
"""
self._elapsed += dt
# Cap transitions per update call to prevent infinite loops
# when duration is 0. Three transitions is one full cycle.
max_transitions = len(TrafficLightPhase)
transitions = 0
while transitions < max_transitions:
duration = self._phase_duration()
if self._elapsed < duration:
break
self._elapsed -= duration
self._transition()
transitions += 1_phase_duration method · python · L132-L138 (7 LOC)crossroads/core/traffic_light.py
def _phase_duration(self) -> float:
"""Return the configured duration for the current phase."""
if self._phase is TrafficLightPhase.RED:
return self._config.red_duration
if self._phase is TrafficLightPhase.GREEN:
return self._config.green_duration
return self._config.yellow_duration_transition method · python · L140-L143 (4 LOC)crossroads/core/traffic_light.py
def _transition(self) -> None:
"""Advance to the next phase and notify observers."""
self._phase = _NEXT_PHASE[self._phase]
self._notify_observers()_notify_observers method · python · L145-L160 (16 LOC)crossroads/core/traffic_light.py
def _notify_observers(self) -> None:
"""Call every registered observer with the new phase.
Each call is wrapped in a try/except so that a failing observer
does not prevent subsequent observers from being notified.
"""
for observer in self._observers:
try:
observer(self, self._phase)
except Exception:
logger.warning(
"Observer %r raised an exception during %s notification",
observer,
self._phase,
exc_info=True,
)GeneratorConfig class · python · L15-L30 (16 LOC)crossroads/core/vehicle_generator.py
class GeneratorConfig:
"""Configuration for Poisson-process vehicle generation.
Attributes:
arrival_rate: Mean vehicle arrivals per second (lambda for
Poisson process). Must be >= 0.
"""
arrival_rate: float = 0.5
def __post_init__(self) -> None:
"""Validate that arrival_rate is non-negative."""
if self.arrival_rate < 0:
raise ValueError(
f"arrival_rate must be >= 0, got {self.arrival_rate}"
)__post_init__ method · python · L25-L30 (6 LOC)crossroads/core/vehicle_generator.py
def __post_init__(self) -> None:
"""Validate that arrival_rate is non-negative."""
if self.arrival_rate < 0:
raise ValueError(
f"arrival_rate must be >= 0, got {self.arrival_rate}"
)VehicleGenerator class · python · L33-L127 (95 LOC)crossroads/core/vehicle_generator.py
class VehicleGenerator:
"""Generates vehicles using a Poisson distribution arrival process.
Uses exponential inter-arrival times (``random.expovariate``) to
model realistic traffic arrival patterns. Vehicles are spawned at
position 0 with velocity 0 and assigned sequential IDs.
Args:
config: Generator configuration with arrival rate.
Defaults to ``GeneratorConfig()`` if ``None``.
rng: Random number generator instance for determinism.
Defaults to ``random.Random()`` if ``None``.
Example:
>>> gen = VehicleGenerator(
... config=GeneratorConfig(arrival_rate=0.5),
... rng=random.Random(42),
... )
>>> vehicles = gen.update(dt=0.02)
"""
def __init__(
self,
config: GeneratorConfig | None = None,
rng: random.Random | None = None,
) -> None:
self._config: GeneratorConfig = config if config is not None else GeneratorConfig()
self.___init__ method · python · L54-L64 (11 LOC)crossroads/core/vehicle_generator.py
def __init__(
self,
config: GeneratorConfig | None = None,
rng: random.Random | None = None,
) -> None:
self._config: GeneratorConfig = config if config is not None else GeneratorConfig()
self._rng: random.Random = rng if rng is not None else random.Random()
self._next_id: int = 0
self._spawned_vehicles: list[Vehicle] = []
self._time_until_next_spawn: float = 0.0
self._schedule_next_spawn()Repobility · severity-and-effort ranking · https://repobility.com
_schedule_next_spawn method · python · L81-L92 (12 LOC)crossroads/core/vehicle_generator.py
def _schedule_next_spawn(self) -> None:
"""Schedule the next spawn using exponential inter-arrival time.
If arrival_rate is 0, sets time to infinity so no vehicle ever
spawns.
"""
if self._config.arrival_rate > 0:
self._time_until_next_spawn = self._rng.expovariate(
self._config.arrival_rate
)
else:
self._time_until_next_spawn = float("inf")update method · python · L94-L127 (34 LOC)crossroads/core/vehicle_generator.py
def update(self, dt: float) -> list[Vehicle]:
"""Advance generator by one timestep, potentially spawning vehicles.
Decrements the internal spawn timer by ``dt``. If the timer
reaches zero or below, spawns one or more vehicles (handling
high rates or large dt values via a while-loop).
Args:
dt: Time delta in seconds since the last update.
Returns:
List of newly spawned vehicles (empty if none spawned).
"""
self._time_until_next_spawn -= dt
spawned: list[Vehicle] = []
while self._time_until_next_spawn <= 0:
vehicle = Vehicle(position=0.0, velocity=0.0)
vehicle.vehicle_id = self._next_id # type: ignore[attr-defined]
self._next_id += 1
spawned.append(vehicle)
self._spawned_vehicles.append(vehicle)
# Use += (not _schedule_next_spawn which uses =) to
# preserve sub-dt timing accuracy across overfloVehicleConfig class · python · L19-L50 (32 LOC)crossroads/core/vehicle.py
class VehicleConfig:
"""Configuration for IDM vehicle physics parameters.
Attributes:
desired_speed: Target speed on free road (v0) in m/s.
max_acceleration: Maximum acceleration (a) in m/s^2.
comfortable_deceleration: Comfortable braking deceleration (b) in m/s^2.
min_spacing: Minimum gap at standstill (s0) in meters.
time_headway: Safe time headway (T) in seconds.
acceleration_exponent: Free-road acceleration exponent (delta).
"""
desired_speed: float = 15.0
max_acceleration: float = 2.0
comfortable_deceleration: float = 3.0
min_spacing: float = 2.0
time_headway: float = 1.5
acceleration_exponent: float = 4.0
def __post_init__(self) -> None:
"""Validate that all parameters are non-negative."""
for name in (
"desired_speed",
"max_acceleration",
"comfortable_deceleration",
"min_spacing",
"time_headway",
"accel__post_init__ method · python · L38-L50 (13 LOC)crossroads/core/vehicle.py
def __post_init__(self) -> None:
"""Validate that all parameters are non-negative."""
for name in (
"desired_speed",
"max_acceleration",
"comfortable_deceleration",
"min_spacing",
"time_headway",
"acceleration_exponent",
):
value = getattr(self, name)
if value < 0:
raise ValueError(f"{name} must be >= 0, got {value}")Vehicle class · python · L53-L198 (146 LOC)crossroads/core/vehicle.py
class Vehicle:
"""Vehicle with Intelligent Driver Model physics.
Computes acceleration using the IDM equation, responding to free-road
conditions, car-following situations, and traffic light signals.
Args:
position: Initial position in meters along the road.
velocity: Initial velocity in m/s. Defaults to 0.0.
config: IDM parameter configuration. Defaults to
``VehicleConfig()`` if ``None``.
Example:
>>> vehicle = Vehicle(position=0.0, velocity=5.0)
>>> vehicle.update(dt=0.02)
>>> vehicle.velocity > 5.0
True
"""
def __init__(
self,
position: float,
velocity: float = 0.0,
config: VehicleConfig | None = None,
) -> None:
self._position: float = position
self._velocity: float = velocity
self._acceleration: float = 0.0
self._config: VehicleConfig = config if config is not None else VehicleConfig()
self._signal_phase: Tr__init__ method · python · L72-L84 (13 LOC)crossroads/core/vehicle.py
def __init__(
self,
position: float,
velocity: float = 0.0,
config: VehicleConfig | None = None,
) -> None:
self._position: float = position
self._velocity: float = velocity
self._acceleration: float = 0.0
self._config: VehicleConfig = config if config is not None else VehicleConfig()
self._signal_phase: TrafficLightPhase | None = None
self._stop_distance: float | None = None
self._leader: Vehicle | None = Nonestop_distance method · python · L112-L114 (3 LOC)crossroads/core/vehicle.py
def stop_distance(self, value: float | None) -> None:
"""Set or clear the stop line position."""
self._stop_distance = valueleader method · python · L122-L124 (3 LOC)crossroads/core/vehicle.py
def leader(self, value: Vehicle | None) -> None:
"""Set or clear the lead vehicle reference."""
self._leader = valueSource: Repobility analyzer · https://repobility.com
on_signal_change method · python · L126-L138 (13 LOC)crossroads/core/vehicle.py
def on_signal_change(self, light: TrafficLight, new_phase: TrafficLightPhase) -> None:
"""Handle traffic light phase change notification.
Compatible with ``TrafficLight.register_observer()`` callback
signature: ``callback(light, new_phase)``.
Args:
light: The traffic light that changed phase.
new_phase: The new phase of the traffic light.
"""
self._signal_phase = new_phase
if new_phase == TrafficLightPhase.GREEN:
self._stop_distance = Noneupdate method · python · L140-L153 (14 LOC)crossroads/core/vehicle.py
def update(self, dt: float) -> None:
"""Advance vehicle physics by one timestep.
Computes IDM acceleration, then integrates velocity and position
using Euler integration. Velocity is clamped to
``[0, desired_speed]``.
Args:
dt: Time delta in seconds since the last update.
"""
self._acceleration = self._compute_idm_acceleration()
self._position += self._velocity * dt + 0.5 * self._acceleration * dt * dt
self._velocity += self._acceleration * dt
self._velocity = max(0.0, min(self._velocity, self._config.desired_speed))_compute_idm_acceleration method · python · L155-L198 (44 LOC)crossroads/core/vehicle.py
def _compute_idm_acceleration(self) -> float:
"""Compute acceleration using the Intelligent Driver Model.
Returns:
The IDM acceleration in m/s^2.
"""
a = self._config.max_acceleration
v = self._velocity
v0 = self._config.desired_speed
delta = self._config.acceleration_exponent
# Free-road term
if v0 == 0:
free_road_term = 0.0
else:
free_road_term = a * (1 - (v / v0) ** delta)
# Determine the most restrictive interaction (leader or red/yellow signal)
gap: float | None = None
delta_v: float = 0.0
if self._leader is not None:
leader_gap = self._leader.position - self._position
if leader_gap > 0:
gap = leader_gap
delta_v = v - self._leader.velocity
if self._signal_phase in (TrafficLightPhase.RED, TrafficLightPhase.YELLOW):
if self._stop_distance is not None:
SimulationEngineConfig class · python · L15-L40 (26 LOC)crossroads/simulation/engine.py
class SimulationEngineConfig:
"""Configuration for the simulation engine timing.
Attributes:
timestep: Fixed simulation timestep in seconds. Must be > 0.
seed: Optional seed for tracking determinism configuration.
This field is **informational only** — the engine does not
create or inject RNG instances. To achieve deterministic
simulation, construct each VehicleGenerator with a
``random.Random(seed + offset)`` instance directly.
max_frame_time: Maximum elapsed time accepted per update() call
in seconds. Prevents spiral-of-death. Must be > 0.
"""
timestep: float = 0.02
seed: int | None = None
max_frame_time: float = 0.25
def __post_init__(self) -> None:
"""Validate that timing parameters are positive."""
if self.timestep <= 0:
raise ValueError(f"timestep must be > 0, got {self.timestep}")
if self.max_frame_time <= 0:
raise Val__post_init__ method · python · L33-L40 (8 LOC)crossroads/simulation/engine.py
def __post_init__(self) -> None:
"""Validate that timing parameters are positive."""
if self.timestep <= 0:
raise ValueError(f"timestep must be > 0, got {self.timestep}")
if self.max_frame_time <= 0:
raise ValueError(
f"max_frame_time must be > 0, got {self.max_frame_time}"
)SimulationEngine class · python · L43-L233 (191 LOC)crossroads/simulation/engine.py
class SimulationEngine:
"""Fixed-timestep simulation engine orchestrating core components.
Coordinates an Intersection and VehicleGenerators using a
fixed-timestep accumulator pattern (dt=0.02s, 50Hz). External code
calls ``update(elapsed_time)`` each frame; the engine consumes
elapsed time in fixed-dt chunks, executing one simulation step per
chunk.
Args:
config: Engine timing configuration. Defaults to
``SimulationEngineConfig()`` if ``None``.
intersection: The intersection to simulate. If ``None``,
generators still update but vehicles are not added.
generators: Mapping from direction name to VehicleGenerator.
Defaults to empty dict if ``None``.
Raises:
ValueError: If a generator direction is not valid for the
intersection.
Example:
>>> from crossroads.core.intersection import Intersection
>>> from crossroads.core.vehicle_generator import VehicleGene__init__ method · python · L76-L103 (28 LOC)crossroads/simulation/engine.py
def __init__(
self,
config: SimulationEngineConfig | None = None,
intersection: Intersection | None = None,
generators: dict[str, VehicleGenerator] | None = None,
) -> None:
self._config = config if config is not None else SimulationEngineConfig()
self._intersection = intersection
self._generators: dict[str, VehicleGenerator] = (
generators if generators is not None else {}
)
self._accumulator: float = 0.0
self._current_time: float = 0.0
self._step_count: int = 0
self._total_vehicles_spawned: int = 0
self._is_running: bool = True
# Validate generator directions against intersection
if self._intersection is not None:
valid_directions = set(self._intersection.lights.keys())
for direction in self._generators:
if direction not in valid_directions:
raise ValueError(
f"is_running method · python · L121-L123 (3 LOC)crossroads/simulation/engine.py
def is_running(self, value: bool) -> None:
"""Set the running state of the engine."""
self._is_running = valueCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
update method · python · L150-L176 (27 LOC)crossroads/simulation/engine.py
def update(self, elapsed_time: float) -> None:
"""Advance the simulation by elapsed wall-clock seconds.
Feeds elapsed time into an accumulator and consumes it in
fixed-timestep chunks. Any remainder stays in the accumulator
for the next call.
If ``is_running`` is ``False``, returns immediately as a no-op
without modifying the accumulator or current_time.
Args:
elapsed_time: Wall-clock time in seconds since the last
call. Negative values are clamped to 0. Values exceeding
``max_frame_time`` are clamped to prevent spiral-of-death.
"""
if not self._is_running:
return
# Clamp to [0, max_frame_time]
elapsed_time = max(0.0, min(elapsed_time, self._config.max_frame_time))
self._accumulator += elapsed_time
dt = self._config.timestep
while self._accumulator >= dt:
self._step(dt)
self._accumulator_step method · python · L178-L204 (27 LOC)crossroads/simulation/engine.py
def _step(self, dt: float) -> None:
"""Execute one fixed-timestep simulation update.
Update sequence per Architecture Component 5:
1. All generators update, collecting spawned vehicles
2. Spawned vehicles added to intersection with their direction
3. Intersection updates (lights, vehicles, collisions)
4. Simulation time and step count advance
Args:
dt: The fixed timestep duration in seconds.
"""
# 1 & 2: Generate vehicles and add to intersection
for direction, generator in self._generators.items():
spawned = generator.update(dt)
for vehicle in spawned:
if self._intersection is not None:
self._intersection.add_vehicle(vehicle, direction)
self._total_vehicles_spawned += 1
# 3: Update intersection (lights, leaders, vehicles, collisions)
if self._intersection is not None:
self._intersectionstart method · python · L206-L212 (7 LOC)crossroads/simulation/engine.py
def start(self) -> None:
"""Start the simulation.
Sets the engine to running. Does not reset state.
"""
self._is_running = True
logger.info("Simulation started at t=%.2f", self._current_time)stop method · python · L214-L220 (7 LOC)crossroads/simulation/engine.py
def stop(self) -> None:
"""Stop the simulation.
Sets the engine to not running. State is preserved.
"""
self._is_running = False
logger.info("Simulation stopped at t=%.2f", self._current_time)reset method · python · L222-L233 (12 LOC)crossroads/simulation/engine.py
def reset(self) -> None:
"""Reset engine timing state.
Resets accumulator, current_time, step_count, and spawned vehicle
counter to zero. Does NOT reset intersection or generators —
those are external.
"""
self._accumulator = 0.0
self._current_time = 0.0
self._step_count = 0
self._total_vehicles_spawned = 0
logger.info("Simulation engine reset")EventBus class · python · L18-L120 (103 LOC)crossroads/simulation/events.py
class EventBus:
"""Named-event publish/subscribe bus for cross-component communication.
Allows components to subscribe callbacks to named events and receive
notifications when those events are emitted. Each callback error is
isolated so that one failing subscriber does not prevent others from
executing.
Example:
>>> bus = EventBus()
>>> bus.subscribe("signal_changed", lambda phase: print(phase))
>>> bus.emit("signal_changed", "GREEN")
GREEN
"""
def __init__(self) -> None:
self._listeners: dict[str, list[Callable[..., None]]] = {}
@property
def events(self) -> list[str]:
"""List of event names that have at least one subscriber."""
return [name for name, cbs in self._listeners.items() if cbs]
def subscriber_count(self, event: str) -> int:
"""Return the number of subscribers for a given event.
Args:
event: The event name to query.
Returns:
subscriber_count method · python · L41-L51 (11 LOC)crossroads/simulation/events.py
def subscriber_count(self, event: str) -> int:
"""Return the number of subscribers for a given event.
Args:
event: The event name to query.
Returns:
Number of registered callbacks for the event, or 0 if the
event has no subscribers.
"""
return len(self._listeners.get(event, []))subscribe method · python · L53-L63 (11 LOC)crossroads/simulation/events.py
def subscribe(self, event: str, callback: Callable[..., None]) -> None:
"""Register a callback for a named event.
The same callback may be registered multiple times; it will be
called once per registration when the event is emitted.
Args:
event: The event name to subscribe to.
callback: A callable invoked when the event is emitted.
"""
self._listeners.setdefault(event, []).append(callback)All rows scored by the Repobility analyzer (https://repobility.com)
unsubscribe method · python · L65-L85 (21 LOC)crossroads/simulation/events.py
def unsubscribe(self, event: str, callback: Callable[..., None]) -> None:
"""Remove a callback from a named event.
If the callback was registered multiple times, only the first
occurrence is removed.
Args:
event: The event name to unsubscribe from.
callback: The callback to remove.
Raises:
ValueError: If the callback is not registered for the event.
"""
listeners = self._listeners.get(event, [])
try:
listeners.remove(callback)
except ValueError:
raise ValueError(
f"Callback {callback!r} is not registered for event "
f"'{event}'"
) from Noneemit method · python · L87-L108 (22 LOC)crossroads/simulation/events.py
def emit(self, event: str, *args: Any, **kwargs: Any) -> None:
"""Emit a named event, calling all subscribed callbacks.
Callbacks are invoked in registration order. If a callback raises
an exception, the error is logged and remaining callbacks still
execute.
Args:
event: The event name to emit.
*args: Positional arguments forwarded to each callback.
**kwargs: Keyword arguments forwarded to each callback.
"""
for callback in list(self._listeners.get(event, [])):
try:
callback(*args, **kwargs)
except Exception:
logger.warning(
"Subscriber %r raised an exception during '%s' event",
callback,
event,
exc_info=True,
)clear method · python · L110-L120 (11 LOC)crossroads/simulation/events.py
def clear(self, event: str | None = None) -> None:
"""Remove all subscribers for one event or all events.
Args:
event: If provided, clear only that event's subscribers.
If ``None``, clear all events.
"""
if event is not None:
self._listeners.pop(event, None)
else:
self._listeners.clear()page 1 / 2next ›