Function bodies 210 total
PluginRegistry.list_plugins method · python · L229-L237 (9 LOC)src/agent_sim_bridge/plugins/registry.py
def list_plugins(self) -> list[str]:
"""Return a sorted list of all registered plugin names.
Returns
-------
list[str]
Plugin names in alphabetical order.
"""
return sorted(self._plugins)PluginRegistry.__repr__ method · python · L247-L252 (6 LOC)src/agent_sim_bridge/plugins/registry.py
def __repr__(self) -> str:
return (
f"PluginRegistry(name={self._name!r}, "
f"base_class={self._base_class.__name__}, "
f"plugins={self.list_plugins()})"
)PluginRegistry.load_entrypoints method · python · L258-L311 (54 LOC)src/agent_sim_bridge/plugins/registry.py
def load_entrypoints(self, group: str) -> None:
"""Discover and register plugins declared as package entry-points.
Iterates over all installed distributions that declare entry-points
in ``group``. Each entry-point value is imported and registered
under the entry-point name.
Plugins that are already registered (e.g., from a previous call)
are skipped with a debug-level log entry rather than raising an
error. This makes repeated calls to ``load_entrypoints`` idempotent.
Parameters
----------
group:
The entry-point group name, e.g. "agent_sim_bridge.plugins".
Example
-------
In a downstream package's ``pyproject.toml``::
[agent_sim_bridge.plugins]
my-processor = "my_package.processors:MyProcessor"
Then at runtime::
registry.load_entrypoints("agent_sim_bridge.plugins")
"""
entry_points = importlib.metadataProductionReadinessScorer.score method · python · L179-L211 (33 LOC)src/agent_sim_bridge/readiness/scorer.py
def score(self, metrics: ReadinessInput) -> ReadinessReport:
"""Compute a :class:`ReadinessReport` from raw metrics.
Parameters
----------
metrics:
The :class:`ReadinessInput` containing raw measurements.
Returns
-------
ReadinessReport
The full readiness report.
"""
dimensions: list[DimensionScore] = [
self._score_reliability(metrics),
self._score_safety(metrics),
self._score_performance(metrics),
self._score_cost(metrics),
]
composite = sum(d.weighted_contribution for d in dimensions)
composite = round(min(100.0, max(0.0, composite)), 2)
grade = self._grade(composite)
recommendations = self._build_recommendations(dimensions, metrics)
return ReadinessReport(
composite_score=composite,
grade=grade,
dimensions=dimensions,
is_production_ready=cProductionReadinessScorer._score_reliability method · python · L218-L235 (18 LOC)src/agent_sim_bridge/readiness/scorer.py
def _score_reliability(m: ReadinessInput) -> DimensionScore:
"""Score reliability from uptime and error rate."""
uptime_score = m.uptime_fraction * 100.0
error_penalty = m.error_rate * 100.0
raw = max(0.0, uptime_score - error_penalty)
raw = min(100.0, raw)
weight = _WEIGHTS["reliability"]
details = (
f"Uptime {m.uptime_fraction:.1%}, error rate {m.error_rate:.1%}. "
f"Raw score: {raw:.1f}"
)
return DimensionScore(
name="reliability",
score=raw,
weight=weight,
weighted_contribution=raw * weight,
details=details,
)ProductionReadinessScorer._score_safety method · python · L238-L259 (22 LOC)src/agent_sim_bridge/readiness/scorer.py
def _score_safety(m: ReadinessInput) -> DimensionScore:
"""Score safety from violation rate."""
if m.sample_size <= 0:
raw = 0.0
else:
violation_rate = m.violation_count / m.sample_size
# 0 violations → 100; 5%+ violations → 0
raw = max(0.0, 100.0 - violation_rate * 2000.0)
raw = min(100.0, raw)
weight = _WEIGHTS["safety"]
details = (
f"{m.violation_count} violations in {m.sample_size} samples "
f"({m.violation_count / max(1, m.sample_size):.2%} rate). "
f"Raw score: {raw:.1f}"
)
return DimensionScore(
name="safety",
score=raw,
weight=weight,
weighted_contribution=raw * weight,
details=details,
)ProductionReadinessScorer._score_performance method · python · L262-L288 (27 LOC)src/agent_sim_bridge/readiness/scorer.py
def _score_performance(m: ReadinessInput) -> DimensionScore:
"""Score performance from latency p50 and p99 vs targets."""
target_p50 = max(1.0, m.latency_target_p50_ms)
target_p99 = max(1.0, m.latency_target_p99_ms)
p50_ratio = m.latency_p50_ms / target_p50
p99_ratio = m.latency_p99_ms / target_p99
# Score each: 100 at or below target, declining as ratio increases
p50_score = max(0.0, 100.0 - (p50_ratio - 1.0) * 50.0) if p50_ratio > 1.0 else 100.0
p99_score = max(0.0, 100.0 - (p99_ratio - 1.0) * 50.0) if p99_ratio > 1.0 else 100.0
raw = (p50_score * 0.4 + p99_score * 0.6)
raw = min(100.0, max(0.0, raw))
weight = _WEIGHTS["performance"]
details = (
f"p50={m.latency_p50_ms:.0f}ms (target={target_p50:.0f}ms), "
f"p99={m.latency_p99_ms:.0f}ms (target={target_p99:.0f}ms). "
f"Raw score: {raw:.1f}"
)
return DimensionScore(
nameOpen data scored by Repobility · https://repobility.com
ProductionReadinessScorer._score_cost method · python · L291-L315 (25 LOC)src/agent_sim_bridge/readiness/scorer.py
def _score_cost(m: ReadinessInput) -> DimensionScore:
"""Score cost efficiency against budget."""
if m.cost_budget_usd <= 0.0:
raw = 0.0
else:
ratio = m.cost_per_task_usd / m.cost_budget_usd
if ratio <= 1.0:
raw = 100.0
else:
# Linear decay: 2x budget → 0
raw = max(0.0, 100.0 - (ratio - 1.0) * 100.0)
raw = min(100.0, raw)
weight = _WEIGHTS["cost"]
details = (
f"Cost per task ${m.cost_per_task_usd:.4f} vs budget ${m.cost_budget_usd:.4f} "
f"(ratio={m.cost_per_task_usd / max(1e-9, m.cost_budget_usd):.2f}). "
f"Raw score: {raw:.1f}"
)
return DimensionScore(
name="cost",
score=raw,
weight=weight,
weighted_contribution=raw * weight,
details=details,
)ProductionReadinessScorer._grade method · python · L322-L330 (9 LOC)src/agent_sim_bridge/readiness/scorer.py
def _grade(composite: float) -> ReadinessGrade:
"""Map a composite score to a :class:`ReadinessGrade`."""
if composite >= 85.0:
return ReadinessGrade.PRODUCTION_READY
if composite >= 70.0:
return ReadinessGrade.MOSTLY_READY
if composite >= 50.0:
return ReadinessGrade.NEEDS_WORK
return ReadinessGrade.NOT_READYProductionReadinessScorer._build_recommendations method · python · L333-L361 (29 LOC)src/agent_sim_bridge/readiness/scorer.py
def _build_recommendations(
dimensions: list[DimensionScore], m: ReadinessInput
) -> list[str]:
"""Build actionable recommendations based on dimension scores."""
recs: list[str] = []
for dim in dimensions:
if dim.name == "reliability" and dim.score < 80.0:
recs.append(
f"Improve reliability: uptime={m.uptime_fraction:.1%}, "
f"error_rate={m.error_rate:.1%}. Target uptime>99.5%, error_rate<1%."
)
if dim.name == "safety" and dim.score < 80.0:
recs.append(
f"Reduce safety violations: {m.violation_count} in {m.sample_size} samples. "
"Review safety constraints and add guardrails."
)
if dim.name == "performance" and dim.score < 80.0:
recs.append(
f"Improve latency: p50={m.latency_p50_ms:.0f}ms, "
f"p99={m.latency_p99_msBoundaryDefinition.__post_init__ method · python · L52-L64 (13 LOC)src/agent_sim_bridge/safety/boundaries.py
def __post_init__(self) -> None:
if len(self.lower_bounds) != len(self.upper_bounds):
raise ValueError(
f"Boundary {self.name!r}: lower_bounds length "
f"({len(self.lower_bounds)}) must equal upper_bounds length "
f"({len(self.upper_bounds)})."
)
for i, (low, high) in enumerate(zip(self.lower_bounds, self.upper_bounds)):
if low > high:
raise ValueError(
f"Boundary {self.name!r}: lower_bounds[{i}]={low} "
f"exceeds upper_bounds[{i}]={high}."
)BoundaryDefinition.contains method · python · L71-L96 (26 LOC)src/agent_sim_bridge/safety/boundaries.py
def contains(self, point: list[float]) -> bool:
"""Return True if ``point`` lies within the boundary (inclusive).
Parameters
----------
point:
N-dimensional point to test. Length must equal :attr:`n_dims`.
Returns
-------
bool
Raises
------
ValueError
If ``point`` has the wrong dimensionality.
"""
if len(point) != self.n_dims:
raise ValueError(
f"Boundary {self.name!r} is {self.n_dims}-dimensional, "
f"but point has {len(point)} dimensions."
)
return all(
low <= value <= high
for value, low, high in zip(point, self.lower_bounds, self.upper_bounds)
)BoundaryDefinition.clamp method · python · L98-L124 (27 LOC)src/agent_sim_bridge/safety/boundaries.py
def clamp(self, point: list[float]) -> list[float]:
"""Return ``point`` clamped to the boundary limits.
Parameters
----------
point:
N-dimensional point to clamp.
Returns
-------
list[float]
Point with each coordinate clamped to ``[lower, upper]``.
Raises
------
ValueError
If ``point`` has the wrong dimensionality.
"""
if len(point) != self.n_dims:
raise ValueError(
f"Boundary {self.name!r} is {self.n_dims}-dimensional, "
f"but point has {len(point)} dimensions."
)
return [
max(low, min(value, high))
for value, low, high in zip(point, self.lower_bounds, self.upper_bounds)
]BoundaryChecker.add_boundary method · python · L177-L195 (19 LOC)src/agent_sim_bridge/safety/boundaries.py
def add_boundary(self, boundary: BoundaryDefinition) -> None:
"""Register a boundary.
Parameters
----------
boundary:
The :class:`BoundaryDefinition` to add.
Raises
------
ValueError
If a boundary with the same name is already registered.
"""
if boundary.name in self._boundaries:
raise ValueError(
f"Boundary {boundary.name!r} is already registered."
)
self._boundaries[boundary.name] = boundary
logger.debug("Registered boundary %r (%d-D).", boundary.name, boundary.n_dims)BoundaryChecker.remove_boundary method · python · L197-L207 (11 LOC)src/agent_sim_bridge/safety/boundaries.py
def remove_boundary(self, name: str) -> None:
"""Remove a registered boundary by name.
Raises
------
KeyError
If no boundary with that name is registered.
"""
if name not in self._boundaries:
raise KeyError(f"No boundary named {name!r} is registered.")
del self._boundaries[name]Repobility (the analyzer behind this table) · https://repobility.com
BoundaryChecker.check_point method · python · L209-L255 (47 LOC)src/agent_sim_bridge/safety/boundaries.py
def check_point(self, point: list[float]) -> list[BoundaryViolation]:
"""Check ``point`` against all registered boundaries.
Only boundaries whose dimensionality matches ``len(point)`` are
checked; mismatched boundaries are skipped with a warning.
Parameters
----------
point:
The point to validate.
Returns
-------
list[BoundaryViolation]
All violations found. Empty list means the point is within all
applicable boundaries.
"""
violations: list[BoundaryViolation] = []
for boundary in self._boundaries.values():
if boundary.n_dims != len(point):
logger.warning(
"Boundary %r is %d-D but point is %d-D; skipping.",
boundary.name,
boundary.n_dims,
len(point),
)
continue
if not boundary.contains(point):
BoundaryChecker.is_within_all_boundaries method · python · L257-L265 (9 LOC)src/agent_sim_bridge/safety/boundaries.py
def is_within_all_boundaries(self, point: list[float]) -> bool:
"""Return True only if the point violates no registered boundary.
Parameters
----------
point:
The point to validate.
"""
return len(self.check_point(point)) == 0SafetyChecker.update_previous_action method · python · L131-L139 (9 LOC)src/agent_sim_bridge/safety/constraints.py
def update_previous_action(self, action: list[float]) -> None:
"""Update the stored previous action for MAX_RATE checks.
Parameters
----------
action:
The action that was just applied to the environment.
"""
self._previous_action = list(action)SafetyChecker.check method · python · L141-L170 (30 LOC)src/agent_sim_bridge/safety/constraints.py
def check(
self,
action: list[float],
constraints: list[SafetyConstraint],
) -> list[SafetyViolation]:
"""Check *action* against every constraint and return all violations.
Parameters
----------
action:
The action vector to validate.
constraints:
List of :class:`SafetyConstraint` specifications to test.
Returns
-------
list[SafetyViolation]
All violations found. Empty list means the action is safe.
"""
violations: list[SafetyViolation] = []
for constraint in constraints:
violation = self._evaluate_constraint(action, constraint)
if violation is not None:
violations.append(violation)
logger.debug(
"Safety violation: %s — %s", constraint.name, violation.message
)
return violationsSafetyChecker._evaluate_constraint method · python · L172-L186 (15 LOC)src/agent_sim_bridge/safety/constraints.py
def _evaluate_constraint(
self,
action: list[float],
constraint: SafetyConstraint,
) -> SafetyViolation | None:
"""Return a :class:`SafetyViolation` if the constraint is violated, else ``None``."""
if constraint.constraint_type == ConstraintType.RANGE:
return self._check_range(action, constraint)
if constraint.constraint_type == ConstraintType.MAX_RATE:
return self._check_max_rate(action, constraint)
if constraint.constraint_type == ConstraintType.FORBIDDEN_ZONE:
return self._check_forbidden_zone(action, constraint)
# CUSTOM: callers are responsible for injecting their own logic;
# the built-in checker cannot evaluate custom constraints.
return NoneSafetyChecker._get_dimension_value method · python · L188-L199 (12 LOC)src/agent_sim_bridge/safety/constraints.py
def _get_dimension_value(
self, action: list[float], dimension: int
) -> float:
"""Return the action value for ``dimension``, or L2 norm when ``dimension == -1``."""
if dimension == -1:
return sum(v * v for v in action) ** 0.5
if dimension < 0 or dimension >= len(action):
raise IndexError(
f"Constraint dimension {dimension} is out of range for "
f"action of length {len(action)}."
)
return action[dimension]SafetyChecker._check_range method · python · L201-L227 (27 LOC)src/agent_sim_bridge/safety/constraints.py
def _check_range(
self, action: list[float], constraint: SafetyConstraint
) -> SafetyViolation | None:
value = self._get_dimension_value(action, constraint.dimension)
if value < constraint.min_value:
return SafetyViolation(
constraint_name=constraint.name,
severity=constraint.severity,
actual_value=value,
message=(
f"Dimension {constraint.dimension} value {value:.6f} is below "
f"minimum {constraint.min_value:.6f}."
),
dimension=constraint.dimension,
)
if value > constraint.max_value:
return SafetyViolation(
constraint_name=constraint.name,
severity=constraint.severity,
actual_value=value,
message=(
f"Dimension {constraint.dimension} value {value:.6f} exceeds "
f"maximum {conSafetyChecker._check_max_rate method · python · L229-L248 (20 LOC)src/agent_sim_bridge/safety/constraints.py
def _check_max_rate(
self, action: list[float], constraint: SafetyConstraint
) -> SafetyViolation | None:
if self._previous_action is None:
return None
current = self._get_dimension_value(action, constraint.dimension)
previous = self._get_dimension_value(self._previous_action, constraint.dimension)
delta = abs(current - previous)
if delta > constraint.max_delta:
return SafetyViolation(
constraint_name=constraint.name,
severity=constraint.severity,
actual_value=delta,
message=(
f"Dimension {constraint.dimension} rate of change {delta:.6f} exceeds "
f"maximum {constraint.max_delta:.6f}."
),
dimension=constraint.dimension,
)
return NoneRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
SafetyChecker._check_forbidden_zone method · python · L250-L265 (16 LOC)src/agent_sim_bridge/safety/constraints.py
def _check_forbidden_zone(
self, action: list[float], constraint: SafetyConstraint
) -> SafetyViolation | None:
value = self._get_dimension_value(action, constraint.dimension)
if constraint.min_value <= value <= constraint.max_value:
return SafetyViolation(
constraint_name=constraint.name,
severity=constraint.severity,
actual_value=value,
message=(
f"Dimension {constraint.dimension} value {value:.6f} falls in "
f"forbidden zone [{constraint.min_value:.6f}, {constraint.max_value:.6f}]."
),
dimension=constraint.dimension,
)
return NoneSafetyMonitor.__init__ method · python · L82-L93 (12 LOC)src/agent_sim_bridge/safety/monitor.py
def __init__(
self,
constraints: list[SafetyConstraint],
auto_stop_on_critical: bool = True,
) -> None:
self._constraints = list(constraints)
self._auto_stop_on_critical = auto_stop_on_critical
self._checker = SafetyChecker()
self._step_records: list[MonitoredStep] = []
self._step_index: int = 0
self._monitoring: bool = False
self._emergency_stopped: bool = FalseSafetyMonitor.start_monitoring method · python · L99-L109 (11 LOC)src/agent_sim_bridge/safety/monitor.py
def start_monitoring(self) -> None:
"""Reset state and begin monitoring a new episode.
Must be called before the first :meth:`check_step`.
"""
self._step_records.clear()
self._step_index = 0
self._monitoring = True
self._emergency_stopped = False
self._checker = SafetyChecker()
logger.info("SafetyMonitor started (%d constraints).", len(self._constraints))SafetyMonitor.stop_monitoring method · python · L111-L118 (8 LOC)src/agent_sim_bridge/safety/monitor.py
def stop_monitoring(self) -> None:
"""Stop monitoring. Violation history is preserved."""
self._monitoring = False
logger.info(
"SafetyMonitor stopped after %d steps, %d violation records.",
self._step_index,
self._count_total_violations(),
)SafetyMonitor.check_step method · python · L124-L174 (51 LOC)src/agent_sim_bridge/safety/monitor.py
def check_step(self, action: list[float]) -> list[SafetyViolation]:
"""Check one action step and record any violations.
Parameters
----------
action:
The action about to be sent to the environment.
Returns
-------
list[SafetyViolation]
Violations found at this step. Empty when safe.
Raises
------
RuntimeError
If :meth:`start_monitoring` has not been called, or if
monitoring was stopped.
"""
if not self._monitoring:
raise RuntimeError(
"SafetyMonitor is not active. Call start_monitoring() first."
)
if self._emergency_stopped:
logger.warning(
"check_step called after emergency stop at step %d.", self._step_index
)
violations = self._checker.check(action, self._constraints)
emergency = False
if violations and self._auto_stop_onSafetyMonitor.emergency_stop method · python · L180-L192 (13 LOC)src/agent_sim_bridge/safety/monitor.py
def emergency_stop(self) -> None:
"""Trigger an emergency stop.
Sets :attr:`emergency_stopped` to True. Callers should poll this
flag after each step and halt actuation immediately.
This method is idempotent — calling it a second time has no effect.
"""
if not self._emergency_stopped:
self._emergency_stopped = True
logger.critical(
"EMERGENCY STOP triggered at step %d.", self._step_index
)SafetyMonitor.get_violations method · python · L198-L203 (6 LOC)src/agent_sim_bridge/safety/monitor.py
def get_violations(self) -> list[SafetyViolation]:
"""Return a flat list of all violations across all steps."""
result: list[SafetyViolation] = []
for record in self._step_records:
result.extend(record.violations)
return resultSafetyMonitor.summary method · python · L231-L241 (11 LOC)src/agent_sim_bridge/safety/monitor.py
def summary(self) -> dict[str, object]:
"""Return a plain-dict summary of monitoring results."""
severity_counts: dict[str, int] = {s.value: 0 for s in ViolationSeverity}
for violation in self.get_violations():
severity_counts[violation.severity.value] += 1
return {
"total_steps": self._step_index,
"total_violations": self._count_total_violations(),
"emergency_stopped": self._emergency_stopped,
"severity_counts": severity_counts,
}Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
SafetyMonitor.__repr__ method · python · L243-L249 (7 LOC)src/agent_sim_bridge/safety/monitor.py
def __repr__(self) -> str:
return (
f"SafetyMonitor("
f"constraints={len(self._constraints)}, "
f"monitoring={self._monitoring}, "
f"emergency_stopped={self._emergency_stopped})"
)SensorReading.__post_init__ method · python · L61-L67 (7 LOC)src/agent_sim_bridge/sensors/base.py
def __post_init__(self) -> None:
if not (0.0 <= self.confidence <= 1.0):
raise ValueError(
f"SensorReading confidence must be in [0, 1], got {self.confidence}."
)
if not self.values:
raise ValueError("SensorReading.values must not be empty.")Sensor.read method · python · L109-L116 (8 LOC)src/agent_sim_bridge/sensors/base.py
def read(self) -> SensorReading:
"""Acquire one reading from this sensor.
Returns
-------
SensorReading
The acquired measurement.
"""Sensor.calibrate method · python · L118-L123 (6 LOC)src/agent_sim_bridge/sensors/base.py
def calibrate(self) -> None:
"""Optional: perform in-place sensor calibration.
The default implementation is a no-op. Subclasses that support
calibration should override this method.
"""Sensor.__repr__ method · python · L125-L130 (6 LOC)src/agent_sim_bridge/sensors/base.py
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"sensor_id={self._sensor_id!r}, "
f"sensor_type={self._sensor_type.value!r})"
)SensorFusion.__init__ method · python · L50-L56 (7 LOC)src/agent_sim_bridge/sensors/fusion.py
def __init__(
self,
strategy: FusionStrategy = FusionStrategy.WEIGHTED_AVERAGE,
output_sensor_type: SensorType | None = None,
) -> None:
self._strategy = strategy
self._output_sensor_type = output_sensor_typeSensorFusion.fuse method · python · L63-L104 (42 LOC)src/agent_sim_bridge/sensors/fusion.py
def fuse(self, readings: list[SensorReading]) -> SensorReading:
"""Combine ``readings`` into a single fused reading.
Parameters
----------
readings:
List of readings to fuse. Must be non-empty and all readings
must have the same number of values.
Returns
-------
SensorReading
Fused reading with ``sensor_id="fused"``.
Raises
------
ValueError
If ``readings`` is empty or dimensionalities are inconsistent.
"""
if not readings:
raise ValueError("Cannot fuse an empty list of readings.")
n_values = len(readings[0].values)
for index, reading in enumerate(readings):
if len(reading.values) != n_values:
raise ValueError(
f"readings[{index}] has {len(reading.values)} values; "
f"expected {n_values} (same as readings[0])."
)
senSensorFusion._weighted_average method · python · L111-L137 (27 LOC)src/agent_sim_bridge/sensors/fusion.py
def _weighted_average(
readings: list[SensorReading],
sensor_type: SensorType,
) -> SensorReading:
"""Confidence-weighted average of all readings."""
total_weight = sum(r.confidence for r in readings)
if total_weight == 0.0:
# Fall back to equal weighting when all confidences are zero.
weights = [1.0 / len(readings)] * len(readings)
else:
weights = [r.confidence / total_weight for r in readings]
n_values = len(readings[0].values)
fused_values = [
sum(weights[i] * readings[i].values[dim] for i in range(len(readings)))
for dim in range(n_values)
]
avg_confidence = sum(r.confidence for r in readings) / len(readings)
return SensorReading(
sensor_id="fused",
sensor_type=sensor_type,
values=fused_values,
confidence=avg_confidence,
timestamp=time.time(),
metadata={"sOpen data scored by Repobility · https://repobility.com
SensorFusion._simple_average method · python · L140-L160 (21 LOC)src/agent_sim_bridge/sensors/fusion.py
def _simple_average(
readings: list[SensorReading],
sensor_type: SensorType,
) -> SensorReading:
"""Unweighted average of all readings."""
n = len(readings)
n_values = len(readings[0].values)
fused_values = [
sum(readings[i].values[dim] for i in range(n)) / n
for dim in range(n_values)
]
avg_confidence = sum(r.confidence for r in readings) / n
return SensorReading(
sensor_id="fused",
sensor_type=sensor_type,
values=fused_values,
confidence=avg_confidence,
timestamp=time.time(),
metadata={"strategy": FusionStrategy.SIMPLE_AVERAGE.value, "n_fused": n},
)SensorFusion._pick_by_confidence method · python · L163-L187 (25 LOC)src/agent_sim_bridge/sensors/fusion.py
def _pick_by_confidence(
readings: list[SensorReading],
sensor_type: SensorType,
pick_highest: bool,
) -> SensorReading:
"""Return the reading with the highest (or lowest) confidence."""
chosen = max(readings, key=lambda r: r.confidence) if pick_highest else min(
readings, key=lambda r: r.confidence
)
strategy_name = (
FusionStrategy.HIGHEST_CONFIDENCE.value
if pick_highest
else FusionStrategy.LOWEST_CONFIDENCE.value
)
return SensorReading(
sensor_id="fused",
sensor_type=sensor_type,
values=list(chosen.values),
confidence=chosen.confidence,
timestamp=time.time(),
metadata={
"strategy": strategy_name,
"source_sensor_id": chosen.sensor_id,
},
)NoiseModel.apply method · python · L32-L44 (13 LOC)src/agent_sim_bridge/sensors/noise.py
def apply(self, values: list[float]) -> list[float]:
"""Return a noisy copy of ``values``.
Parameters
----------
values:
Clean sensor reading values.
Returns
-------
list[float]
Values with noise added.
"""GaussianNoise.__init__ method · python · L76-L86 (11 LOC)src/agent_sim_bridge/sensors/noise.py
def __init__(
self,
std_devs: list[float],
mean: float = 0.0,
seed: int | None = None,
) -> None:
if not std_devs:
raise ValueError("GaussianNoise requires at least one std_dev value.")
self._std_devs = list(std_devs)
self._mean = mean
self._rng = random.Random(seed)GaussianNoise.apply method · python · L88-L106 (19 LOC)src/agent_sim_bridge/sensors/noise.py
def apply(self, values: list[float]) -> list[float]:
"""Apply Gaussian noise to each element of ``values``.
Parameters
----------
values:
Clean sensor reading.
Returns
-------
list[float]
Noisy copy.
"""
result: list[float] = []
for i, value in enumerate(values):
std = self._std_devs[min(i, len(self._std_devs) - 1)]
noise = self._rng.gauss(self._mean, std) if std != 0.0 else 0.0
result.append(value + noise)
return resultUniformNoise.__init__ method · python · L134-L142 (9 LOC)src/agent_sim_bridge/sensors/noise.py
def __init__(
self,
magnitudes: list[float],
seed: int | None = None,
) -> None:
if not magnitudes:
raise ValueError("UniformNoise requires at least one magnitude value.")
self._magnitudes = list(magnitudes)
self._rng = random.Random(seed)UniformNoise.apply method · python · L144-L162 (19 LOC)src/agent_sim_bridge/sensors/noise.py
def apply(self, values: list[float]) -> list[float]:
"""Apply uniform noise to each element of ``values``.
Parameters
----------
values:
Clean sensor reading.
Returns
-------
list[float]
Noisy copy.
"""
result: list[float] = []
for i, value in enumerate(values):
magnitude = self._magnitudes[min(i, len(self._magnitudes) - 1)]
noise = self._rng.uniform(-magnitude, magnitude) if magnitude != 0.0 else 0.0
result.append(value + noise)
return resultTrajectoryStep.to_dict method · python · L59-L71 (13 LOC)src/agent_sim_bridge/simulation/recorder.py
def to_dict(self) -> dict[str, object]:
"""Serialise to a plain dict with numpy arrays as lists."""
return {
"step_index": self.step_index,
"observation": self.observation.tolist(),
"action": self.action.tolist(),
"reward": self.reward,
"next_observation": self.next_observation.tolist(),
"terminated": self.terminated,
"truncated": self.truncated,
"timestamp": self.timestamp,
"info": self.info,
}Repobility (the analyzer behind this table) · https://repobility.com
TrajectoryStep.from_dict method · python · L74-L86 (13 LOC)src/agent_sim_bridge/simulation/recorder.py
def from_dict(cls, data: dict[str, object]) -> "TrajectoryStep":
"""Deserialise from a plain dict."""
return cls(
step_index=int(data["step_index"]), # type: ignore[arg-type]
observation=np.array(data["observation"], dtype=np.float32),
action=np.array(data["action"], dtype=np.float32),
reward=float(data["reward"]), # type: ignore[arg-type]
next_observation=np.array(data["next_observation"], dtype=np.float32),
terminated=bool(data["terminated"]),
truncated=bool(data["truncated"]),
timestamp=float(data["timestamp"]), # type: ignore[arg-type]
info=dict(data.get("info", {})), # type: ignore[arg-type]
)TrajectoryRecorder.record method · python · L123-L149 (27 LOC)src/agent_sim_bridge/simulation/recorder.py
def record(
self,
observation: NDArray[np.float32],
action: NDArray[np.float32],
reward: float,
next_observation: NDArray[np.float32],
terminated: bool,
truncated: bool,
info: dict[str, object] | None = None,
) -> None:
"""Append one transition to the in-memory buffer.
Silently ignored if ``max_steps`` has already been reached.
"""
if self._max_steps is not None and len(self._steps) >= self._max_steps:
return
step = TrajectoryStep(
step_index=len(self._steps),
observation=observation.copy(),
action=action.copy(),
reward=reward,
next_observation=next_observation.copy(),
terminated=terminated,
truncated=truncated,
info=info or {},
)
self._steps.append(step)TrajectoryRecorder.save method · python · L174-L214 (41 LOC)src/agent_sim_bridge/simulation/recorder.py
def save(self, path: str | Path) -> None:
"""Save the trajectory to a compressed numpy archive (``.npz``).
Parameters
----------
path:
Destination file path. The ``.npz`` extension is appended if
not already present.
"""
destination = Path(path)
if destination.suffix != ".npz":
destination = destination.with_suffix(".npz")
destination.parent.mkdir(parents=True, exist_ok=True)
if not self._steps:
logger.warning("Saving empty trajectory to %s", destination)
arrays: dict[str, NDArray[np.float32]] = {}
if self._steps:
arrays["observations"] = np.stack(
[s.observation for s in self._steps], axis=0
)
arrays["actions"] = np.stack([s.action for s in self._steps], axis=0)
arrays["rewards"] = np.array(
[s.reward for s in self._steps], dtype=np.float32
)