← back to invincible-jha__agent-sim-bridge

Function bodies 210 total

All specs Real LLM only Function bodies
PluginRegistry.list_plugins method · python · L229-L237 (9 LOC)
src/agent_sim_bridge/plugins/registry.py
    def list_plugins(self) -> list[str]:
        """Return a sorted list of all registered plugin names.

        Returns
        -------
        list[str]
            Plugin names in alphabetical order.
        """
        return sorted(self._plugins)
PluginRegistry.__repr__ method · python · L247-L252 (6 LOC)
src/agent_sim_bridge/plugins/registry.py
    def __repr__(self) -> str:
        return (
            f"PluginRegistry(name={self._name!r}, "
            f"base_class={self._base_class.__name__}, "
            f"plugins={self.list_plugins()})"
        )
PluginRegistry.load_entrypoints method · python · L258-L311 (54 LOC)
src/agent_sim_bridge/plugins/registry.py
    def load_entrypoints(self, group: str) -> None:
        """Discover and register plugins declared as package entry-points.

        Iterates over all installed distributions that declare entry-points
        in ``group``. Each entry-point value is imported and registered
        under the entry-point name.

        Plugins that are already registered (e.g., from a previous call)
        are skipped with a debug-level log entry rather than raising an
        error. This makes repeated calls to ``load_entrypoints`` idempotent.

        Parameters
        ----------
        group:
            The entry-point group name, e.g. "agent_sim_bridge.plugins".

        Example
        -------
        In a downstream package's ``pyproject.toml``::

            [agent_sim_bridge.plugins]
            my-processor = "my_package.processors:MyProcessor"

        Then at runtime::

            registry.load_entrypoints("agent_sim_bridge.plugins")
        """
        entry_points = importlib.metadata
ProductionReadinessScorer.score method · python · L179-L211 (33 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def score(self, metrics: ReadinessInput) -> ReadinessReport:
        """Compute a :class:`ReadinessReport` from raw metrics.

        Parameters
        ----------
        metrics:
            The :class:`ReadinessInput` containing raw measurements.

        Returns
        -------
        ReadinessReport
            The full readiness report.
        """
        dimensions: list[DimensionScore] = [
            self._score_reliability(metrics),
            self._score_safety(metrics),
            self._score_performance(metrics),
            self._score_cost(metrics),
        ]

        composite = sum(d.weighted_contribution for d in dimensions)
        composite = round(min(100.0, max(0.0, composite)), 2)
        grade = self._grade(composite)
        recommendations = self._build_recommendations(dimensions, metrics)

        return ReadinessReport(
            composite_score=composite,
            grade=grade,
            dimensions=dimensions,
            is_production_ready=c
ProductionReadinessScorer._score_reliability method · python · L218-L235 (18 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _score_reliability(m: ReadinessInput) -> DimensionScore:
        """Score reliability from uptime and error rate."""
        uptime_score = m.uptime_fraction * 100.0
        error_penalty = m.error_rate * 100.0
        raw = max(0.0, uptime_score - error_penalty)
        raw = min(100.0, raw)
        weight = _WEIGHTS["reliability"]
        details = (
            f"Uptime {m.uptime_fraction:.1%}, error rate {m.error_rate:.1%}. "
            f"Raw score: {raw:.1f}"
        )
        return DimensionScore(
            name="reliability",
            score=raw,
            weight=weight,
            weighted_contribution=raw * weight,
            details=details,
        )
ProductionReadinessScorer._score_safety method · python · L238-L259 (22 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _score_safety(m: ReadinessInput) -> DimensionScore:
        """Score safety from violation rate."""
        if m.sample_size <= 0:
            raw = 0.0
        else:
            violation_rate = m.violation_count / m.sample_size
            # 0 violations → 100; 5%+ violations → 0
            raw = max(0.0, 100.0 - violation_rate * 2000.0)
        raw = min(100.0, raw)
        weight = _WEIGHTS["safety"]
        details = (
            f"{m.violation_count} violations in {m.sample_size} samples "
            f"({m.violation_count / max(1, m.sample_size):.2%} rate). "
            f"Raw score: {raw:.1f}"
        )
        return DimensionScore(
            name="safety",
            score=raw,
            weight=weight,
            weighted_contribution=raw * weight,
            details=details,
        )
ProductionReadinessScorer._score_performance method · python · L262-L288 (27 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _score_performance(m: ReadinessInput) -> DimensionScore:
        """Score performance from latency p50 and p99 vs targets."""
        target_p50 = max(1.0, m.latency_target_p50_ms)
        target_p99 = max(1.0, m.latency_target_p99_ms)

        p50_ratio = m.latency_p50_ms / target_p50
        p99_ratio = m.latency_p99_ms / target_p99

        # Score each: 100 at or below target, declining as ratio increases
        p50_score = max(0.0, 100.0 - (p50_ratio - 1.0) * 50.0) if p50_ratio > 1.0 else 100.0
        p99_score = max(0.0, 100.0 - (p99_ratio - 1.0) * 50.0) if p99_ratio > 1.0 else 100.0

        raw = (p50_score * 0.4 + p99_score * 0.6)
        raw = min(100.0, max(0.0, raw))
        weight = _WEIGHTS["performance"]
        details = (
            f"p50={m.latency_p50_ms:.0f}ms (target={target_p50:.0f}ms), "
            f"p99={m.latency_p99_ms:.0f}ms (target={target_p99:.0f}ms). "
            f"Raw score: {raw:.1f}"
        )
        return DimensionScore(
            name
Open data scored by Repobility · https://repobility.com
ProductionReadinessScorer._score_cost method · python · L291-L315 (25 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _score_cost(m: ReadinessInput) -> DimensionScore:
        """Score cost efficiency against budget."""
        if m.cost_budget_usd <= 0.0:
            raw = 0.0
        else:
            ratio = m.cost_per_task_usd / m.cost_budget_usd
            if ratio <= 1.0:
                raw = 100.0
            else:
                # Linear decay: 2x budget → 0
                raw = max(0.0, 100.0 - (ratio - 1.0) * 100.0)
        raw = min(100.0, raw)
        weight = _WEIGHTS["cost"]
        details = (
            f"Cost per task ${m.cost_per_task_usd:.4f} vs budget ${m.cost_budget_usd:.4f} "
            f"(ratio={m.cost_per_task_usd / max(1e-9, m.cost_budget_usd):.2f}). "
            f"Raw score: {raw:.1f}"
        )
        return DimensionScore(
            name="cost",
            score=raw,
            weight=weight,
            weighted_contribution=raw * weight,
            details=details,
        )
ProductionReadinessScorer._grade method · python · L322-L330 (9 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _grade(composite: float) -> ReadinessGrade:
        """Map a composite score to a :class:`ReadinessGrade`."""
        if composite >= 85.0:
            return ReadinessGrade.PRODUCTION_READY
        if composite >= 70.0:
            return ReadinessGrade.MOSTLY_READY
        if composite >= 50.0:
            return ReadinessGrade.NEEDS_WORK
        return ReadinessGrade.NOT_READY
ProductionReadinessScorer._build_recommendations method · python · L333-L361 (29 LOC)
src/agent_sim_bridge/readiness/scorer.py
    def _build_recommendations(
        dimensions: list[DimensionScore], m: ReadinessInput
    ) -> list[str]:
        """Build actionable recommendations based on dimension scores."""
        recs: list[str] = []
        for dim in dimensions:
            if dim.name == "reliability" and dim.score < 80.0:
                recs.append(
                    f"Improve reliability: uptime={m.uptime_fraction:.1%}, "
                    f"error_rate={m.error_rate:.1%}. Target uptime>99.5%, error_rate<1%."
                )
            if dim.name == "safety" and dim.score < 80.0:
                recs.append(
                    f"Reduce safety violations: {m.violation_count} in {m.sample_size} samples. "
                    "Review safety constraints and add guardrails."
                )
            if dim.name == "performance" and dim.score < 80.0:
                recs.append(
                    f"Improve latency: p50={m.latency_p50_ms:.0f}ms, "
                    f"p99={m.latency_p99_ms
BoundaryDefinition.__post_init__ method · python · L52-L64 (13 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def __post_init__(self) -> None:
        if len(self.lower_bounds) != len(self.upper_bounds):
            raise ValueError(
                f"Boundary {self.name!r}: lower_bounds length "
                f"({len(self.lower_bounds)}) must equal upper_bounds length "
                f"({len(self.upper_bounds)})."
            )
        for i, (low, high) in enumerate(zip(self.lower_bounds, self.upper_bounds)):
            if low > high:
                raise ValueError(
                    f"Boundary {self.name!r}: lower_bounds[{i}]={low} "
                    f"exceeds upper_bounds[{i}]={high}."
                )
BoundaryDefinition.contains method · python · L71-L96 (26 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def contains(self, point: list[float]) -> bool:
        """Return True if ``point`` lies within the boundary (inclusive).

        Parameters
        ----------
        point:
            N-dimensional point to test.  Length must equal :attr:`n_dims`.

        Returns
        -------
        bool

        Raises
        ------
        ValueError
            If ``point`` has the wrong dimensionality.
        """
        if len(point) != self.n_dims:
            raise ValueError(
                f"Boundary {self.name!r} is {self.n_dims}-dimensional, "
                f"but point has {len(point)} dimensions."
            )
        return all(
            low <= value <= high
            for value, low, high in zip(point, self.lower_bounds, self.upper_bounds)
        )
BoundaryDefinition.clamp method · python · L98-L124 (27 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def clamp(self, point: list[float]) -> list[float]:
        """Return ``point`` clamped to the boundary limits.

        Parameters
        ----------
        point:
            N-dimensional point to clamp.

        Returns
        -------
        list[float]
            Point with each coordinate clamped to ``[lower, upper]``.

        Raises
        ------
        ValueError
            If ``point`` has the wrong dimensionality.
        """
        if len(point) != self.n_dims:
            raise ValueError(
                f"Boundary {self.name!r} is {self.n_dims}-dimensional, "
                f"but point has {len(point)} dimensions."
            )
        return [
            max(low, min(value, high))
            for value, low, high in zip(point, self.lower_bounds, self.upper_bounds)
        ]
BoundaryChecker.add_boundary method · python · L177-L195 (19 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def add_boundary(self, boundary: BoundaryDefinition) -> None:
        """Register a boundary.

        Parameters
        ----------
        boundary:
            The :class:`BoundaryDefinition` to add.

        Raises
        ------
        ValueError
            If a boundary with the same name is already registered.
        """
        if boundary.name in self._boundaries:
            raise ValueError(
                f"Boundary {boundary.name!r} is already registered."
            )
        self._boundaries[boundary.name] = boundary
        logger.debug("Registered boundary %r (%d-D).", boundary.name, boundary.n_dims)
BoundaryChecker.remove_boundary method · python · L197-L207 (11 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def remove_boundary(self, name: str) -> None:
        """Remove a registered boundary by name.

        Raises
        ------
        KeyError
            If no boundary with that name is registered.
        """
        if name not in self._boundaries:
            raise KeyError(f"No boundary named {name!r} is registered.")
        del self._boundaries[name]
Repobility (the analyzer behind this table) · https://repobility.com
BoundaryChecker.check_point method · python · L209-L255 (47 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def check_point(self, point: list[float]) -> list[BoundaryViolation]:
        """Check ``point`` against all registered boundaries.

        Only boundaries whose dimensionality matches ``len(point)`` are
        checked; mismatched boundaries are skipped with a warning.

        Parameters
        ----------
        point:
            The point to validate.

        Returns
        -------
        list[BoundaryViolation]
            All violations found.  Empty list means the point is within all
            applicable boundaries.
        """
        violations: list[BoundaryViolation] = []
        for boundary in self._boundaries.values():
            if boundary.n_dims != len(point):
                logger.warning(
                    "Boundary %r is %d-D but point is %d-D; skipping.",
                    boundary.name,
                    boundary.n_dims,
                    len(point),
                )
                continue
            if not boundary.contains(point):
     
BoundaryChecker.is_within_all_boundaries method · python · L257-L265 (9 LOC)
src/agent_sim_bridge/safety/boundaries.py
    def is_within_all_boundaries(self, point: list[float]) -> bool:
        """Return True only if the point violates no registered boundary.

        Parameters
        ----------
        point:
            The point to validate.
        """
        return len(self.check_point(point)) == 0
SafetyChecker.update_previous_action method · python · L131-L139 (9 LOC)
src/agent_sim_bridge/safety/constraints.py
    def update_previous_action(self, action: list[float]) -> None:
        """Update the stored previous action for MAX_RATE checks.

        Parameters
        ----------
        action:
            The action that was just applied to the environment.
        """
        self._previous_action = list(action)
SafetyChecker.check method · python · L141-L170 (30 LOC)
src/agent_sim_bridge/safety/constraints.py
    def check(
        self,
        action: list[float],
        constraints: list[SafetyConstraint],
    ) -> list[SafetyViolation]:
        """Check *action* against every constraint and return all violations.

        Parameters
        ----------
        action:
            The action vector to validate.
        constraints:
            List of :class:`SafetyConstraint` specifications to test.

        Returns
        -------
        list[SafetyViolation]
            All violations found.  Empty list means the action is safe.
        """
        violations: list[SafetyViolation] = []

        for constraint in constraints:
            violation = self._evaluate_constraint(action, constraint)
            if violation is not None:
                violations.append(violation)
                logger.debug(
                    "Safety violation: %s — %s", constraint.name, violation.message
                )

        return violations
SafetyChecker._evaluate_constraint method · python · L172-L186 (15 LOC)
src/agent_sim_bridge/safety/constraints.py
    def _evaluate_constraint(
        self,
        action: list[float],
        constraint: SafetyConstraint,
    ) -> SafetyViolation | None:
        """Return a :class:`SafetyViolation` if the constraint is violated, else ``None``."""
        if constraint.constraint_type == ConstraintType.RANGE:
            return self._check_range(action, constraint)
        if constraint.constraint_type == ConstraintType.MAX_RATE:
            return self._check_max_rate(action, constraint)
        if constraint.constraint_type == ConstraintType.FORBIDDEN_ZONE:
            return self._check_forbidden_zone(action, constraint)
        # CUSTOM: callers are responsible for injecting their own logic;
        # the built-in checker cannot evaluate custom constraints.
        return None
SafetyChecker._get_dimension_value method · python · L188-L199 (12 LOC)
src/agent_sim_bridge/safety/constraints.py
    def _get_dimension_value(
        self, action: list[float], dimension: int
    ) -> float:
        """Return the action value for ``dimension``, or L2 norm when ``dimension == -1``."""
        if dimension == -1:
            return sum(v * v for v in action) ** 0.5
        if dimension < 0 or dimension >= len(action):
            raise IndexError(
                f"Constraint dimension {dimension} is out of range for "
                f"action of length {len(action)}."
            )
        return action[dimension]
SafetyChecker._check_range method · python · L201-L227 (27 LOC)
src/agent_sim_bridge/safety/constraints.py
    def _check_range(
        self, action: list[float], constraint: SafetyConstraint
    ) -> SafetyViolation | None:
        value = self._get_dimension_value(action, constraint.dimension)
        if value < constraint.min_value:
            return SafetyViolation(
                constraint_name=constraint.name,
                severity=constraint.severity,
                actual_value=value,
                message=(
                    f"Dimension {constraint.dimension} value {value:.6f} is below "
                    f"minimum {constraint.min_value:.6f}."
                ),
                dimension=constraint.dimension,
            )
        if value > constraint.max_value:
            return SafetyViolation(
                constraint_name=constraint.name,
                severity=constraint.severity,
                actual_value=value,
                message=(
                    f"Dimension {constraint.dimension} value {value:.6f} exceeds "
                    f"maximum {con
SafetyChecker._check_max_rate method · python · L229-L248 (20 LOC)
src/agent_sim_bridge/safety/constraints.py
    def _check_max_rate(
        self, action: list[float], constraint: SafetyConstraint
    ) -> SafetyViolation | None:
        if self._previous_action is None:
            return None
        current = self._get_dimension_value(action, constraint.dimension)
        previous = self._get_dimension_value(self._previous_action, constraint.dimension)
        delta = abs(current - previous)
        if delta > constraint.max_delta:
            return SafetyViolation(
                constraint_name=constraint.name,
                severity=constraint.severity,
                actual_value=delta,
                message=(
                    f"Dimension {constraint.dimension} rate of change {delta:.6f} exceeds "
                    f"maximum {constraint.max_delta:.6f}."
                ),
                dimension=constraint.dimension,
            )
        return None
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
SafetyChecker._check_forbidden_zone method · python · L250-L265 (16 LOC)
src/agent_sim_bridge/safety/constraints.py
    def _check_forbidden_zone(
        self, action: list[float], constraint: SafetyConstraint
    ) -> SafetyViolation | None:
        value = self._get_dimension_value(action, constraint.dimension)
        if constraint.min_value <= value <= constraint.max_value:
            return SafetyViolation(
                constraint_name=constraint.name,
                severity=constraint.severity,
                actual_value=value,
                message=(
                    f"Dimension {constraint.dimension} value {value:.6f} falls in "
                    f"forbidden zone [{constraint.min_value:.6f}, {constraint.max_value:.6f}]."
                ),
                dimension=constraint.dimension,
            )
        return None
SafetyMonitor.__init__ method · python · L82-L93 (12 LOC)
src/agent_sim_bridge/safety/monitor.py
    def __init__(
        self,
        constraints: list[SafetyConstraint],
        auto_stop_on_critical: bool = True,
    ) -> None:
        self._constraints = list(constraints)
        self._auto_stop_on_critical = auto_stop_on_critical
        self._checker = SafetyChecker()
        self._step_records: list[MonitoredStep] = []
        self._step_index: int = 0
        self._monitoring: bool = False
        self._emergency_stopped: bool = False
SafetyMonitor.start_monitoring method · python · L99-L109 (11 LOC)
src/agent_sim_bridge/safety/monitor.py
    def start_monitoring(self) -> None:
        """Reset state and begin monitoring a new episode.

        Must be called before the first :meth:`check_step`.
        """
        self._step_records.clear()
        self._step_index = 0
        self._monitoring = True
        self._emergency_stopped = False
        self._checker = SafetyChecker()
        logger.info("SafetyMonitor started (%d constraints).", len(self._constraints))
SafetyMonitor.stop_monitoring method · python · L111-L118 (8 LOC)
src/agent_sim_bridge/safety/monitor.py
    def stop_monitoring(self) -> None:
        """Stop monitoring.  Violation history is preserved."""
        self._monitoring = False
        logger.info(
            "SafetyMonitor stopped after %d steps, %d violation records.",
            self._step_index,
            self._count_total_violations(),
        )
SafetyMonitor.check_step method · python · L124-L174 (51 LOC)
src/agent_sim_bridge/safety/monitor.py
    def check_step(self, action: list[float]) -> list[SafetyViolation]:
        """Check one action step and record any violations.

        Parameters
        ----------
        action:
            The action about to be sent to the environment.

        Returns
        -------
        list[SafetyViolation]
            Violations found at this step.  Empty when safe.

        Raises
        ------
        RuntimeError
            If :meth:`start_monitoring` has not been called, or if
            monitoring was stopped.
        """
        if not self._monitoring:
            raise RuntimeError(
                "SafetyMonitor is not active. Call start_monitoring() first."
            )
        if self._emergency_stopped:
            logger.warning(
                "check_step called after emergency stop at step %d.", self._step_index
            )

        violations = self._checker.check(action, self._constraints)
        emergency = False

        if violations and self._auto_stop_on
SafetyMonitor.emergency_stop method · python · L180-L192 (13 LOC)
src/agent_sim_bridge/safety/monitor.py
    def emergency_stop(self) -> None:
        """Trigger an emergency stop.

        Sets :attr:`emergency_stopped` to True.  Callers should poll this
        flag after each step and halt actuation immediately.

        This method is idempotent — calling it a second time has no effect.
        """
        if not self._emergency_stopped:
            self._emergency_stopped = True
            logger.critical(
                "EMERGENCY STOP triggered at step %d.", self._step_index
            )
SafetyMonitor.get_violations method · python · L198-L203 (6 LOC)
src/agent_sim_bridge/safety/monitor.py
    def get_violations(self) -> list[SafetyViolation]:
        """Return a flat list of all violations across all steps."""
        result: list[SafetyViolation] = []
        for record in self._step_records:
            result.extend(record.violations)
        return result
SafetyMonitor.summary method · python · L231-L241 (11 LOC)
src/agent_sim_bridge/safety/monitor.py
    def summary(self) -> dict[str, object]:
        """Return a plain-dict summary of monitoring results."""
        severity_counts: dict[str, int] = {s.value: 0 for s in ViolationSeverity}
        for violation in self.get_violations():
            severity_counts[violation.severity.value] += 1
        return {
            "total_steps": self._step_index,
            "total_violations": self._count_total_violations(),
            "emergency_stopped": self._emergency_stopped,
            "severity_counts": severity_counts,
        }
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
SafetyMonitor.__repr__ method · python · L243-L249 (7 LOC)
src/agent_sim_bridge/safety/monitor.py
    def __repr__(self) -> str:
        return (
            f"SafetyMonitor("
            f"constraints={len(self._constraints)}, "
            f"monitoring={self._monitoring}, "
            f"emergency_stopped={self._emergency_stopped})"
        )
SensorReading.__post_init__ method · python · L61-L67 (7 LOC)
src/agent_sim_bridge/sensors/base.py
    def __post_init__(self) -> None:
        if not (0.0 <= self.confidence <= 1.0):
            raise ValueError(
                f"SensorReading confidence must be in [0, 1], got {self.confidence}."
            )
        if not self.values:
            raise ValueError("SensorReading.values must not be empty.")
Sensor.read method · python · L109-L116 (8 LOC)
src/agent_sim_bridge/sensors/base.py
    def read(self) -> SensorReading:
        """Acquire one reading from this sensor.

        Returns
        -------
        SensorReading
            The acquired measurement.
        """
Sensor.calibrate method · python · L118-L123 (6 LOC)
src/agent_sim_bridge/sensors/base.py
    def calibrate(self) -> None:
        """Optional: perform in-place sensor calibration.

        The default implementation is a no-op.  Subclasses that support
        calibration should override this method.
        """
Sensor.__repr__ method · python · L125-L130 (6 LOC)
src/agent_sim_bridge/sensors/base.py
    def __repr__(self) -> str:
        return (
            f"{self.__class__.__name__}("
            f"sensor_id={self._sensor_id!r}, "
            f"sensor_type={self._sensor_type.value!r})"
        )
SensorFusion.__init__ method · python · L50-L56 (7 LOC)
src/agent_sim_bridge/sensors/fusion.py
    def __init__(
        self,
        strategy: FusionStrategy = FusionStrategy.WEIGHTED_AVERAGE,
        output_sensor_type: SensorType | None = None,
    ) -> None:
        self._strategy = strategy
        self._output_sensor_type = output_sensor_type
SensorFusion.fuse method · python · L63-L104 (42 LOC)
src/agent_sim_bridge/sensors/fusion.py
    def fuse(self, readings: list[SensorReading]) -> SensorReading:
        """Combine ``readings`` into a single fused reading.

        Parameters
        ----------
        readings:
            List of readings to fuse.  Must be non-empty and all readings
            must have the same number of values.

        Returns
        -------
        SensorReading
            Fused reading with ``sensor_id="fused"``.

        Raises
        ------
        ValueError
            If ``readings`` is empty or dimensionalities are inconsistent.
        """
        if not readings:
            raise ValueError("Cannot fuse an empty list of readings.")

        n_values = len(readings[0].values)
        for index, reading in enumerate(readings):
            if len(reading.values) != n_values:
                raise ValueError(
                    f"readings[{index}] has {len(reading.values)} values; "
                    f"expected {n_values} (same as readings[0])."
                )

        sen
SensorFusion._weighted_average method · python · L111-L137 (27 LOC)
src/agent_sim_bridge/sensors/fusion.py
    def _weighted_average(
        readings: list[SensorReading],
        sensor_type: SensorType,
    ) -> SensorReading:
        """Confidence-weighted average of all readings."""
        total_weight = sum(r.confidence for r in readings)
        if total_weight == 0.0:
            # Fall back to equal weighting when all confidences are zero.
            weights = [1.0 / len(readings)] * len(readings)
        else:
            weights = [r.confidence / total_weight for r in readings]

        n_values = len(readings[0].values)
        fused_values = [
            sum(weights[i] * readings[i].values[dim] for i in range(len(readings)))
            for dim in range(n_values)
        ]
        avg_confidence = sum(r.confidence for r in readings) / len(readings)

        return SensorReading(
            sensor_id="fused",
            sensor_type=sensor_type,
            values=fused_values,
            confidence=avg_confidence,
            timestamp=time.time(),
            metadata={"s
Open data scored by Repobility · https://repobility.com
SensorFusion._simple_average method · python · L140-L160 (21 LOC)
src/agent_sim_bridge/sensors/fusion.py
    def _simple_average(
        readings: list[SensorReading],
        sensor_type: SensorType,
    ) -> SensorReading:
        """Unweighted average of all readings."""
        n = len(readings)
        n_values = len(readings[0].values)
        fused_values = [
            sum(readings[i].values[dim] for i in range(n)) / n
            for dim in range(n_values)
        ]
        avg_confidence = sum(r.confidence for r in readings) / n

        return SensorReading(
            sensor_id="fused",
            sensor_type=sensor_type,
            values=fused_values,
            confidence=avg_confidence,
            timestamp=time.time(),
            metadata={"strategy": FusionStrategy.SIMPLE_AVERAGE.value, "n_fused": n},
        )
SensorFusion._pick_by_confidence method · python · L163-L187 (25 LOC)
src/agent_sim_bridge/sensors/fusion.py
    def _pick_by_confidence(
        readings: list[SensorReading],
        sensor_type: SensorType,
        pick_highest: bool,
    ) -> SensorReading:
        """Return the reading with the highest (or lowest) confidence."""
        chosen = max(readings, key=lambda r: r.confidence) if pick_highest else min(
            readings, key=lambda r: r.confidence
        )
        strategy_name = (
            FusionStrategy.HIGHEST_CONFIDENCE.value
            if pick_highest
            else FusionStrategy.LOWEST_CONFIDENCE.value
        )
        return SensorReading(
            sensor_id="fused",
            sensor_type=sensor_type,
            values=list(chosen.values),
            confidence=chosen.confidence,
            timestamp=time.time(),
            metadata={
                "strategy": strategy_name,
                "source_sensor_id": chosen.sensor_id,
            },
        )
NoiseModel.apply method · python · L32-L44 (13 LOC)
src/agent_sim_bridge/sensors/noise.py
    def apply(self, values: list[float]) -> list[float]:
        """Return a noisy copy of ``values``.

        Parameters
        ----------
        values:
            Clean sensor reading values.

        Returns
        -------
        list[float]
            Values with noise added.
        """
GaussianNoise.__init__ method · python · L76-L86 (11 LOC)
src/agent_sim_bridge/sensors/noise.py
    def __init__(
        self,
        std_devs: list[float],
        mean: float = 0.0,
        seed: int | None = None,
    ) -> None:
        if not std_devs:
            raise ValueError("GaussianNoise requires at least one std_dev value.")
        self._std_devs = list(std_devs)
        self._mean = mean
        self._rng = random.Random(seed)
GaussianNoise.apply method · python · L88-L106 (19 LOC)
src/agent_sim_bridge/sensors/noise.py
    def apply(self, values: list[float]) -> list[float]:
        """Apply Gaussian noise to each element of ``values``.

        Parameters
        ----------
        values:
            Clean sensor reading.

        Returns
        -------
        list[float]
            Noisy copy.
        """
        result: list[float] = []
        for i, value in enumerate(values):
            std = self._std_devs[min(i, len(self._std_devs) - 1)]
            noise = self._rng.gauss(self._mean, std) if std != 0.0 else 0.0
            result.append(value + noise)
        return result
UniformNoise.__init__ method · python · L134-L142 (9 LOC)
src/agent_sim_bridge/sensors/noise.py
    def __init__(
        self,
        magnitudes: list[float],
        seed: int | None = None,
    ) -> None:
        if not magnitudes:
            raise ValueError("UniformNoise requires at least one magnitude value.")
        self._magnitudes = list(magnitudes)
        self._rng = random.Random(seed)
UniformNoise.apply method · python · L144-L162 (19 LOC)
src/agent_sim_bridge/sensors/noise.py
    def apply(self, values: list[float]) -> list[float]:
        """Apply uniform noise to each element of ``values``.

        Parameters
        ----------
        values:
            Clean sensor reading.

        Returns
        -------
        list[float]
            Noisy copy.
        """
        result: list[float] = []
        for i, value in enumerate(values):
            magnitude = self._magnitudes[min(i, len(self._magnitudes) - 1)]
            noise = self._rng.uniform(-magnitude, magnitude) if magnitude != 0.0 else 0.0
            result.append(value + noise)
        return result
TrajectoryStep.to_dict method · python · L59-L71 (13 LOC)
src/agent_sim_bridge/simulation/recorder.py
    def to_dict(self) -> dict[str, object]:
        """Serialise to a plain dict with numpy arrays as lists."""
        return {
            "step_index": self.step_index,
            "observation": self.observation.tolist(),
            "action": self.action.tolist(),
            "reward": self.reward,
            "next_observation": self.next_observation.tolist(),
            "terminated": self.terminated,
            "truncated": self.truncated,
            "timestamp": self.timestamp,
            "info": self.info,
        }
Repobility (the analyzer behind this table) · https://repobility.com
TrajectoryStep.from_dict method · python · L74-L86 (13 LOC)
src/agent_sim_bridge/simulation/recorder.py
    def from_dict(cls, data: dict[str, object]) -> "TrajectoryStep":
        """Deserialise from a plain dict."""
        return cls(
            step_index=int(data["step_index"]),  # type: ignore[arg-type]
            observation=np.array(data["observation"], dtype=np.float32),
            action=np.array(data["action"], dtype=np.float32),
            reward=float(data["reward"]),  # type: ignore[arg-type]
            next_observation=np.array(data["next_observation"], dtype=np.float32),
            terminated=bool(data["terminated"]),
            truncated=bool(data["truncated"]),
            timestamp=float(data["timestamp"]),  # type: ignore[arg-type]
            info=dict(data.get("info", {})),  # type: ignore[arg-type]
        )
TrajectoryRecorder.record method · python · L123-L149 (27 LOC)
src/agent_sim_bridge/simulation/recorder.py
    def record(
        self,
        observation: NDArray[np.float32],
        action: NDArray[np.float32],
        reward: float,
        next_observation: NDArray[np.float32],
        terminated: bool,
        truncated: bool,
        info: dict[str, object] | None = None,
    ) -> None:
        """Append one transition to the in-memory buffer.

        Silently ignored if ``max_steps`` has already been reached.
        """
        if self._max_steps is not None and len(self._steps) >= self._max_steps:
            return
        step = TrajectoryStep(
            step_index=len(self._steps),
            observation=observation.copy(),
            action=action.copy(),
            reward=reward,
            next_observation=next_observation.copy(),
            terminated=terminated,
            truncated=truncated,
            info=info or {},
        )
        self._steps.append(step)
TrajectoryRecorder.save method · python · L174-L214 (41 LOC)
src/agent_sim_bridge/simulation/recorder.py
    def save(self, path: str | Path) -> None:
        """Save the trajectory to a compressed numpy archive (``.npz``).

        Parameters
        ----------
        path:
            Destination file path. The ``.npz`` extension is appended if
            not already present.
        """
        destination = Path(path)
        if destination.suffix != ".npz":
            destination = destination.with_suffix(".npz")
        destination.parent.mkdir(parents=True, exist_ok=True)

        if not self._steps:
            logger.warning("Saving empty trajectory to %s", destination)

        arrays: dict[str, NDArray[np.float32]] = {}
        if self._steps:
            arrays["observations"] = np.stack(
                [s.observation for s in self._steps], axis=0
            )
            arrays["actions"] = np.stack([s.action for s in self._steps], axis=0)
            arrays["rewards"] = np.array(
                [s.reward for s in self._steps], dtype=np.float32
            )
           
‹ prevpage 3 / 5next ›