Function bodies 210 total
GazeboBackend.backend_reset method · python · L42-L47 (6 LOC)src/agent_sim_bridge/backends/gazebo.py
def backend_reset(
self,
seed: int | None,
options: dict[str, object] | None,
) -> NDArray[np.float32]:
raise NotImplementedError(_STUB_MESSAGE)PyBulletBackend.__init__ method · python · L64-L74 (11 LOC)src/agent_sim_bridge/backends/pybullet.py
def __init__(self, use_gui: bool = False) -> None:
if not _PYBULLET_AVAILABLE:
raise ImportError(
"PyBullet is not installed. Install it with: pip install pybullet"
)
self._use_gui = use_gui
logger.debug(
"PyBulletBackend initialised (use_gui=%s). "
"Note: this is a stub — override all abstract methods.",
use_gui,
)PyBulletBackend.backend_reset method · python · L76-L81 (6 LOC)src/agent_sim_bridge/backends/pybullet.py
def backend_reset(
self,
seed: int | None,
options: dict[str, object] | None,
) -> NDArray[np.float32]:
raise NotImplementedError(_STUB_MESSAGE)CascadeSimulator.add_node method · python · L152-L162 (11 LOC)src/agent_sim_bridge/cascade/simulator.py
def add_node(self, node: AgentNode) -> None:
"""Add an agent node to the dependency graph.
Parameters
----------
node:
The :class:`AgentNode` to add.
"""
self._nodes[node.node_id] = node
if node.node_id not in self._dependents:
self._dependents[node.node_id] = []CascadeSimulator.add_dependency method · python · L164-L186 (23 LOC)src/agent_sim_bridge/cascade/simulator.py
def add_dependency(self, dependency_id: str, dependent_id: str) -> None:
"""Declare that *dependent_id* depends on *dependency_id*.
A failure in *dependency_id* will propagate to *dependent_id*.
Parameters
----------
dependency_id:
The upstream node (the one that can fail).
dependent_id:
The downstream node (the one affected when upstream fails).
Raises
------
KeyError
If either node has not been added via :meth:`add_node`.
"""
if dependency_id not in self._nodes:
raise KeyError(f"Node '{dependency_id}' not in graph")
if dependent_id not in self._nodes:
raise KeyError(f"Node '{dependent_id}' not in graph")
if dependent_id not in self._dependents[dependency_id]:
self._dependents[dependency_id].append(dependent_id)CascadeSimulator.get_node method · python · L192-L200 (9 LOC)src/agent_sim_bridge/cascade/simulator.py
def get_node(self, node_id: str) -> AgentNode:
"""Return the node with *node_id*.
Raises
------
KeyError
If the node is not in the graph.
"""
return self._nodes[node_id]CascadeSimulator.simulate_failure method · python · L206-L295 (90 LOC)src/agent_sim_bridge/cascade/simulator.py
def simulate_failure(
self,
origin_node_id: str,
failure_mode: FailureMode | None = None,
) -> CascadeResult:
"""Inject a failure at *origin_node_id* and propagate it.
Uses breadth-first traversal over the dependency graph. Each
node's ``propagation_probability`` is treated as a hard threshold
for deterministic testing (probability=1.0 means always propagate;
probability=0.0 means never propagates beyond that node).
Parameters
----------
origin_node_id:
The node where the failure originates.
failure_mode:
Override the origin node's default failure mode.
Returns
-------
CascadeResult
Full cascade simulation result.
Raises
------
KeyError
If *origin_node_id* is not in the graph.
"""
if origin_node_id not in self._nodes:
raise KeyError(f"Node '{origin_node_id}Repobility — same analyzer, your code, free for public repos · /scan/
CascadeSimulator.topological_order method · python · L302-L335 (34 LOC)src/agent_sim_bridge/cascade/simulator.py
def topological_order(self) -> list[str]:
"""Return nodes in topological order (roots first).
Uses Kahn's algorithm. Raises ValueError on cycle detection.
Returns
-------
list[str]
Node IDs in topological order.
Raises
------
ValueError
If the graph contains a cycle.
"""
in_degree: dict[str, int] = {nid: 0 for nid in self._nodes}
for node_id, deps in self._dependents.items():
for dep_id in deps:
in_degree[dep_id] = in_degree.get(dep_id, 0) + 1
queue: deque[str] = deque(nid for nid, deg in in_degree.items() if deg == 0)
order: list[str] = []
while queue:
current = queue.popleft()
order.append(current)
for dep_id in self._dependents.get(current, []):
in_degree[dep_id] -= 1
if in_degree[dep_id] == 0:
queue.append(dep_id)
icli function · python · L50-L55 (6 LOC)src/agent_sim_bridge/cli/main.py
def cli(log_level: str) -> None:
"""Simulation-to-reality bridge for AI agents with environment adapters."""
logging.basicConfig(
level=getattr(logging, log_level.upper()),
format="%(levelname)s %(name)s — %(message)s",
)version_command function · python · L64-L69 (6 LOC)src/agent_sim_bridge/cli/main.py
def version_command() -> None:
"""Show detailed version information."""
from agent_sim_bridge import __version__
console.print(f"[bold]agent-sim-bridge[/bold] v{__version__}")
console.print(f"Python {sys.version}")plugins_command function · python · L78-L83 (6 LOC)src/agent_sim_bridge/cli/main.py
def plugins_command() -> None:
"""List all registered plugins loaded from entry-points."""
console.print("[bold]Registered plugins:[/bold]")
console.print(
" (No plugins registered. Install a plugin package to see entries here.)"
)sim_run function · python · L118-L149 (32 LOC)src/agent_sim_bridge/cli/main.py
def sim_run(
steps: int,
seed: int | None,
timeout: float,
record_path: str | None,
) -> None:
"""Execute a simulation episode using a zero-action baseline policy.
This command demonstrates the sandbox execution loop. Replace the
zero-action policy with a real agent by extending this command or
calling :class:`~agent_sim_bridge.simulation.SimulationSandbox` directly.
"""
from agent_sim_bridge.simulation.sandbox import SimulationSandbox
console.print(
f"[bold cyan]sim run[/bold cyan] — "
f"steps={steps}, seed={seed}, timeout={timeout}s"
)
console.print(
"[yellow]Note:[/yellow] No environment backend is configured. "
"This command demonstrates the CLI surface only.\n"
"Wire up a real backend by subclassing SimulationEnvironment."
)
# Show what a result would look like.
console.print("\n[bold]Configuration[/bold]")
table = Table(show_header=False, box=None)
table.add_row("Masim_record function · python · L161-L187 (27 LOC)src/agent_sim_bridge/cli/main.py
def sim_record(output_path: str, steps: int, seed: int | None) -> None:
"""Record an episode trajectory to OUTPUT_PATH (.npz).
The trajectory is captured using
:class:`~agent_sim_bridge.simulation.TrajectoryRecorder` and saved
as a compressed numpy archive. Replay the trajectory later with
``sim replay``.
"""
from agent_sim_bridge.simulation.recorder import TrajectoryRecorder
console.print(
f"[bold cyan]sim record[/bold cyan] — output={output_path!r}, "
f"steps={steps}, seed={seed}"
)
console.print(
"[yellow]Note:[/yellow] No environment backend is configured. "
"This command demonstrates the CLI surface only.\n"
"Wire up a real backend to produce actual trajectory data."
)
# Demonstrate recorder API.
recorder = TrajectoryRecorder(max_steps=steps)
console.print(
f"\nTrajectoryRecorder ready (max_steps={steps}). "
f"Would save to: [bold]{output_path}[/bold]"
)
conssim_replay function · python · L204-L239 (36 LOC)src/agent_sim_bridge/cli/main.py
def sim_replay(
trajectory_path: str,
seed: int | None,
stop_on_termination: bool,
) -> None:
"""Replay a recorded trajectory from TRAJECTORY_PATH and report divergence.
Loads the trajectory, feeds actions back through the simulation, and
computes the observation MSE and reward correlation against the original.
"""
from agent_sim_bridge.simulation.recorder import TrajectoryRecorder
console.print(
f"[bold cyan]sim replay[/bold cyan] — path={trajectory_path!r}, seed={seed}"
)
try:
recorder = TrajectoryRecorder.load(trajectory_path)
except Exception as exc:
console.print(f"[red]Error loading trajectory:[/red] {exc}")
raise SystemExit(1) from exc
console.print(f"\nLoaded trajectory: {recorder!r}")
console.print(
"[yellow]Note:[/yellow] No environment backend is configured. "
"Connect a SimulationEnvironment and TrajectoryReplay to run the full replay."
)
# Show trajectory susim_calibrate function · python · L255-L315 (61 LOC)src/agent_sim_bridge/cli/main.py
def sim_calibrate(data_path: str, output: str | None) -> None:
"""Fit a CalibrationProfile from paired sim/real observations in DATA_PATH.
DATA_PATH must be a JSON file with keys ``"sim_obs"`` and ``"real_obs"``,
each a list of observation vectors (list-of-lists of floats).
Example JSON format::
{
"sim_obs": [[1.0, 2.0], [1.1, 2.1]],
"real_obs": [[1.05, 1.98], [1.15, 2.08]]
}
"""
import pathlib
from agent_sim_bridge.transfer.calibration import Calibrator
console.print(f"[bold cyan]sim calibrate[/bold cyan] — data={data_path!r}")
try:
raw = pathlib.Path(data_path).read_text(encoding="utf-8")
data = json.loads(raw)
except Exception as exc:
console.print(f"[red]Error reading data file:[/red] {exc}")
raise SystemExit(1) from exc
sim_obs: list[list[float]] = data.get("sim_obs", [])
real_obs: list[list[float]] = data.get("real_obs", [])
if not sim_obs or not reaRepobility · open methodology · https://repobility.com/research/
sim_gap_analysis function · python · L331-L389 (59 LOC)src/agent_sim_bridge/cli/main.py
def sim_gap_analysis(data_path: str, output: str | None) -> None:
"""Measure and report the sim-to-real gap from paired trajectory data.
DATA_PATH must be a JSON file with keys ``"sim_obs"``, ``"real_obs"``,
and optionally ``"sim_rewards"`` and ``"real_rewards"``.
Example JSON format::
{
"sim_obs": [[1.0, 2.0], [1.1, 2.1]],
"real_obs": [[1.05, 1.98], [1.15, 2.08]],
"sim_rewards": [0.5, 0.6],
"real_rewards":[0.48, 0.57]
}
"""
import dataclasses
import pathlib
from agent_sim_bridge.metrics.gap import SimRealGap
console.print(f"[bold cyan]sim gap-analysis[/bold cyan] — data={data_path!r}")
try:
raw = pathlib.Path(data_path).read_text(encoding="utf-8")
data = json.loads(raw)
except Exception as exc:
console.print(f"[red]Error reading data file:[/red] {exc}")
raise SystemExit(1) from exc
sim_obs: list[list[float]] = data.get("sim_obs",sim_safety_check function · python · L406-L516 (111 LOC)src/agent_sim_bridge/cli/main.py
def sim_safety_check(action_json: str, constraints_path: str | None) -> None:
"""Validate ACTION_JSON against safety constraints.
ACTION_JSON is a JSON-encoded list of floats representing an action
vector, e.g.::
'[0.5, -0.3, 1.2]'
CONSTRAINTS_PATH (optional) is a JSON file containing a list of
constraint definitions, each with keys: ``name``, ``constraint_type``,
``dimension``, ``min_value``, ``max_value``, ``severity``.
Example constraints file::
[
{
"name": "joint_0_range",
"constraint_type": "range",
"dimension": 0,
"min_value": -1.0,
"max_value": 1.0,
"severity": "error"
}
]
"""
import pathlib
from agent_sim_bridge.safety.constraints import (
ConstraintType,
SafetyChecker,
SafetyConstraint,
ViolationSeverity,
)
console.print(f"[bold cyan]sim safety-checkgap_estimate function · python · L566-L671 (106 LOC)src/agent_sim_bridge/cli/main.py
def gap_estimate(
sim_data_path: str,
real_data_path: str,
metrics_str: str | None,
output_format: str,
) -> None:
"""Estimate the sim-to-real gap from distribution data files.
Both data files must be JSON with the schema::
{"dimensions": {"dim_name": {"sim": [values], "real": [values]}}}
Example::
agent-sim-bridge gap estimate \\
--sim-data sim.json --real-data real.json \\
--metrics kl,wasserstein --format markdown
"""
import pathlib
from agent_sim_bridge.gap.estimator import GapDimension, GapEstimator, GapMetric
from agent_sim_bridge.gap.report import GapReporter
_METRIC_MAP: dict[str, GapMetric] = {
"kl": GapMetric.KL_DIVERGENCE,
"wasserstein": GapMetric.WASSERSTEIN,
"mmd": GapMetric.MMD,
"jsd": GapMetric.JENSEN_SHANNON,
}
# Load data files.
try:
sim_raw = json.loads(pathlib.Path(sim_data_path).read_text(encoding="utf-8"))
real_rgap_compare function · python · L701-L788 (88 LOC)src/agent_sim_bridge/cli/main.py
def gap_compare(
sim_data_path: str,
real_data_path: str,
output_path: str,
) -> None:
"""Compare sim and real distributions and save a full JSON report.
Both data files must be JSON with the schema::
{"dimensions": {"dim_name": {"sim": [values], "real": [values]}}}
The report is written to OUTPUT as a JSON file.
Example::
agent-sim-bridge gap compare \\
--sim-data sim.json --real-data real.json \\
--output report.json
"""
import pathlib
from agent_sim_bridge.gap.estimator import GapDimension, GapEstimator
from agent_sim_bridge.gap.report import GapReporter
# Load data files.
try:
sim_raw = json.loads(pathlib.Path(sim_data_path).read_text(encoding="utf-8"))
real_raw = json.loads(pathlib.Path(real_data_path).read_text(encoding="utf-8"))
except Exception as exc:
console.print(f"[red]Error reading data files:[/red] {exc}")
raise SystemExit(1) from exc
_StubBackend.backend_reset method · python · L64-L72 (9 LOC)src/agent_sim_bridge/convenience.py
def backend_reset(
self,
seed: int | None,
options: dict[str, object] | None,
) -> NDArray[np.float32]:
"""Reset the stub RNG and return a random initial observation."""
if seed is not None:
self._rng = np.random.default_rng(seed)
return self._rng.random(self._OBS_DIM).astype(np.float32)_StubBackend.backend_step method · python · L74-L84 (11 LOC)src/agent_sim_bridge/convenience.py
def backend_step(
self,
action: NDArray[np.float32],
) -> tuple[NDArray[np.float32], float, bool, bool, dict[str, object]]:
"""Advance one step: return random obs, unit reward, no termination."""
obs = self._rng.random(self._OBS_DIM).astype(np.float32)
reward: float = 1.0
terminated: bool = False
truncated: bool = False
info: dict[str, object] = {}
return obs, reward, terminated, truncated, infoSimulator.__init__ method · python · L141-L156 (16 LOC)src/agent_sim_bridge/convenience.py
def __init__(
self,
scenario: "Scenario | str | None" = None,
seed: int | None = None,
) -> None:
from agent_sim_bridge.simulation.scenario import Scenario
if scenario is None:
self._scenario: Scenario = Scenario(name="quickstart-scenario")
elif isinstance(scenario, str):
self._scenario = Scenario(name=scenario)
else:
self._scenario = scenario
self._seed = seed
self._backend = _StubBackend(seed=seed)Simulator.run method · python · L162-L199 (38 LOC)src/agent_sim_bridge/convenience.py
def run(self, steps: int = 50) -> "ExecutionResult":
"""Run a simulation episode and return the result.
A zero-action policy is used so no ML model is required.
Parameters
----------
steps:
Maximum number of timesteps to execute before truncation.
Returns
-------
ExecutionResult
Episode statistics: ``total_reward``, ``steps``, ``success``,
``terminated``, ``truncated``, ``wall_time_seconds``.
"""
from agent_sim_bridge.environment.sim_env import SimulationEnvironment
from agent_sim_bridge.simulation.sandbox import SimulationSandbox
env = SimulationEnvironment(
backend=self._backend,
name="quickstart-env",
max_episode_steps=steps,
)
sandbox = SimulationSandbox(
environment=env,
max_steps=steps,
timeout_seconds=None,
reset_seed=self._seed,
)
Powered by Repobility — scan your code at https://repobility.com
Simulator.outcome_met method · python · L210-L228 (19 LOC)src/agent_sim_bridge/convenience.py
def outcome_met(self, result: "ExecutionResult") -> bool:
"""Check whether *result* satisfies the scenario's outcome criteria.
Parameters
----------
result:
:class:`~agent_sim_bridge.simulation.sandbox.ExecutionResult`
returned by :meth:`run`.
Returns
-------
bool
True if all outcome criteria defined on the scenario are met.
"""
return self._scenario.evaluate(
total_reward=result.total_reward,
steps=result.steps,
terminated=result.terminated,
)quick_recorder function · python · L239-L268 (30 LOC)src/agent_sim_bridge/convenience.py
def quick_recorder(max_steps: int | None = None) -> "TrajectoryRecorder":
"""Create a ready-to-use trajectory recorder.
Returns a :class:`~agent_sim_bridge.simulation.recorder.TrajectoryRecorder`
with an optional step cap. No configuration required — call
:meth:`~agent_sim_bridge.simulation.recorder.TrajectoryRecorder.record`
immediately after each environment step.
Parameters
----------
max_steps:
Maximum number of steps to record. ``None`` means no limit.
Returns
-------
TrajectoryRecorder
An empty recorder ready to accumulate steps.
Example
-------
::
recorder = quick_recorder()
recorder.record(obs, action, 1.0, next_obs, False, False)
recorder.save("my_episode.npz")
print(len(recorder)) # 1
"""
from agent_sim_bridge.simulation.recorder import TrajectoryRecorder
return TrajectoryRecorder(max_steps=max_steps)quick_sandbox function · python · L271-L327 (57 LOC)src/agent_sim_bridge/convenience.py
def quick_sandbox(
environment: "Environment",
policy: Callable[[NDArray[np.float32]], NDArray[np.float32]],
max_steps: int = 1000,
timeout_seconds: float | None = 60.0,
record: bool = False,
seed: int | None = None,
) -> "ExecutionResult":
"""Run a single sandboxed episode and return the result.
Wraps :class:`~agent_sim_bridge.simulation.sandbox.SimulationSandbox`
for the common case of running one episode with a policy callable.
The sandbox enforces step and wall-clock limits automatically.
Parameters
----------
environment:
Any :class:`~agent_sim_bridge.environment.base.Environment` instance.
policy:
Callable that maps an observation array to an action array.
max_steps:
Hard cap on episode length before truncation.
timeout_seconds:
Maximum wall-clock seconds for the episode. Pass ``None`` to
disable the time limit.
record:
When ``True`` the returned
:class:`quick_safety_monitor function · python · L330-L394 (65 LOC)src/agent_sim_bridge/convenience.py
def quick_safety_monitor(
constraints: list[tuple[str, int, float, float]],
auto_stop_on_critical: bool = True,
) -> "SafetyMonitor":
"""Build a :class:`~agent_sim_bridge.safety.monitor.SafetyMonitor` from a compact spec.
Each element of *constraints* is a 4-tuple
``(name, dimension, min_value, max_value)`` that becomes a
:class:`~agent_sim_bridge.safety.constraints.SafetyConstraint` with
``ConstraintType.RANGE`` and ``ViolationSeverity.ERROR``.
For advanced constraint types (MAX_RATE, FORBIDDEN_ZONE, CUSTOM) or
custom severities, construct
:class:`~agent_sim_bridge.safety.constraints.SafetyConstraint` objects
directly and pass them to
:class:`~agent_sim_bridge.safety.monitor.SafetyMonitor`.
Parameters
----------
constraints:
List of ``(name, dimension, min_value, max_value)`` tuples.
auto_stop_on_critical:
Trigger emergency stop automatically on the first CRITICAL violation.
Returns
-------
quick_skill_library function · python · L397-L425 (29 LOC)src/agent_sim_bridge/convenience.py
def quick_skill_library() -> "SkillLibrary":
"""Create an empty :class:`~agent_sim_bridge.skills.library.SkillLibrary`.
The library stores and retrieves
:class:`~agent_sim_bridge.skills.base.Skill` instances by name.
Register skills immediately and use
:meth:`~agent_sim_bridge.skills.library.SkillLibrary.get` to retrieve
them by exact name, or
:meth:`~agent_sim_bridge.skills.library.SkillLibrary.search` for
fuzzy lookup by name or tags.
Returns
-------
SkillLibrary
An empty skill registry.
Example
-------
::
library = quick_skill_library()
library.register(my_reach_skill)
library.register(my_grasp_skill)
skill = library.get("reach")
print(library.list_names())
"""
from agent_sim_bridge.skills.library import SkillLibrary
return SkillLibrary()quick_gap_analysis function · python · L428-L484 (57 LOC)src/agent_sim_bridge/convenience.py
def quick_gap_analysis(
sim_observations: list[list[float]],
real_observations: list[list[float]],
sim_rewards: list[float] | None = None,
real_rewards: list[float] | None = None,
metadata: dict[str, object] | None = None,
) -> "GapReport":
"""Measure the sim-to-real observation and reward gap in one call.
Wraps :class:`~agent_sim_bridge.metrics.gap.SimRealGap` for the common
case of comparing two parallel trajectories collected under the same
policy.
Parameters
----------
sim_observations:
Observation vectors from the simulation trajectory. Each element
is a list of floats representing one timestep.
real_observations:
Observation vectors from the real trajectory. Inner dimension must
match ``sim_observations``.
sim_rewards:
Scalar rewards from simulation (optional). When provided together
with ``real_rewards``, reward MAE and bias are included in the report.
real_rewards:
quick_transfer_bridge function · python · L487-L538 (52 LOC)src/agent_sim_bridge/convenience.py
def quick_transfer_bridge(
scale_factors: list[float],
offsets: list[float] | None = None,
) -> "TransferBridge":
"""Create a sim-to-real :class:`~agent_sim_bridge.transfer.bridge.TransferBridge`.
Constructs a :class:`~agent_sim_bridge.transfer.bridge.CalibrationProfile`
with the given per-dimension linear transform and wraps it in a
:class:`~agent_sim_bridge.transfer.bridge.TransferBridge` for immediate use.
Parameters
----------
scale_factors:
Per-dimension multiplicative scale applied as
``real[i] = sim[i] * scale[i] + offset[i]``.
offsets:
Per-dimension additive offsets. Defaults to all zeros when ``None``.
Returns
-------
TransferBridge
A configured bridge. Use
:meth:`~agent_sim_bridge.transfer.bridge.TransferBridge.sim_to_real`
and
:meth:`~agent_sim_bridge.transfer.bridge.TransferBridge.real_to_sim`
to convert observation or action vectors.
Raises
----DigitalTwin.__init__ method · python · L143-L149 (7 LOC)src/agent_sim_bridge/digital_twin/twin.py
def __init__(
self,
modified_agent: Callable[[object], object],
output_comparator: Callable[[object], bool] | None = None,
) -> None:
self._modified_agent = modified_agent
self._comparator = output_comparator or self._default_comparatorRepobility · code-quality intelligence · https://repobility.com
DigitalTwin.replay method · python · L155-L221 (67 LOC)src/agent_sim_bridge/digital_twin/twin.py
def replay(self, records: list[ReplayRecord]) -> TwinReport:
"""Replay all *records* through the modified agent.
Parameters
----------
records:
The production traffic records to replay.
Returns
-------
TwinReport
Comparison report.
"""
if not records:
return TwinReport(
total_records=0,
matched_records=0,
diverged_records=0,
match_rate=1.0,
divergence_points=[],
performance_delta_ms=0.0,
avg_latency_original_ms=0.0,
avg_latency_modified_ms=0.0,
generated_at=datetime.datetime.now(datetime.timezone.utc),
)
matched = 0
diverged = 0
divergence_points: list[DivergencePoint] = []
modified_latencies: list[float] = []
original_latencies: list[float] = []
for record in records:
DigitalTwin.replay_batch method · python · L223-L248 (26 LOC)src/agent_sim_bridge/digital_twin/twin.py
def replay_batch(
self,
records: list[ReplayRecord],
batch_size: int = 100,
) -> list[TwinReport]:
"""Replay records in batches and return per-batch reports.
Parameters
----------
records:
Full list of replay records.
batch_size:
Number of records per batch.
Returns
-------
list[TwinReport]
One report per batch.
"""
if batch_size <= 0:
raise ValueError(f"batch_size must be positive, got {batch_size}")
reports: list[TwinReport] = []
for start in range(0, len(records), batch_size):
batch = records[start : start + batch_size]
reports.append(self.replay(batch))
return reportsDigitalTwin._invoke_modified method · python · L254-L273 (20 LOC)src/agent_sim_bridge/digital_twin/twin.py
def _invoke_modified(self, record: ReplayRecord) -> tuple[object, float]:
"""Run the modified agent and measure latency.
Parameters
----------
record:
The replay record providing the input.
Returns
-------
tuple[object, float]
(output, latency_ms)
"""
start = time.perf_counter()
try:
output = self._modified_agent(record.input_data)
except Exception as exc:
output = {"error": str(exc)}
elapsed_ms = (time.perf_counter() - start) * 1000.0
return output, elapsed_ms_scale_array function · python · L39-L53 (15 LOC)src/agent_sim_bridge/environment/adapter.py
def _scale_array(
value: NDArray[np.float32],
source_low: NDArray[np.float32],
source_high: NDArray[np.float32],
target_low: NDArray[np.float32],
target_high: NDArray[np.float32],
) -> NDArray[np.float32]:
"""Linearly rescale *value* from source bounds to target bounds."""
source_range = source_high - source_low
target_range = target_high - target_low
# Avoid division by zero: where range is zero, output the target midpoint.
safe_range = np.where(source_range == 0.0, 1.0, source_range)
normalised = (value - source_low) / safe_range
scaled = normalised * target_range + target_low
return scaled.astype(np.float32)EnvironmentAdapter.__init__ method · python · L82-L103 (22 LOC)src/agent_sim_bridge/environment/adapter.py
def __init__(
self,
source: Environment,
target_state_space: SpaceSpec | None = None,
target_action_space: SpaceSpec | None = None,
obs_strategy: AdaptationStrategy = AdaptationStrategy.IDENTITY,
action_strategy: AdaptationStrategy = AdaptationStrategy.IDENTITY,
name_override: str | None = None,
) -> None:
self._source = source
self._target_state_space = target_state_space or source.state_space
self._target_action_space = target_action_space or source.action_space
self._obs_strategy = obs_strategy
self._action_strategy = action_strategy
source_info = source.info
self._info = EnvironmentInfo(
name=name_override or f"adapted({source_info.name})",
version=source_info.version,
is_simulation=source_info.is_simulation,
max_episode_steps=source_info.max_episode_steps,
metadata={**source_info.metadata, "adapter_stratEnvironmentAdapter._adapt_obs method · python · L125-L141 (17 LOC)src/agent_sim_bridge/environment/adapter.py
def _adapt_obs(self, obs: NDArray[np.float32]) -> NDArray[np.float32]:
if self._obs_strategy == AdaptationStrategy.IDENTITY:
return obs
src_space = self._source.state_space
tgt_space = self._target_state_space
if self._obs_strategy == AdaptationStrategy.CLIP:
return np.clip(obs, tgt_space.low_array(), tgt_space.high_array()).astype(
np.float32
)
# SCALE
return _scale_array(
obs,
src_space.low_array(),
src_space.high_array(),
tgt_space.low_array(),
tgt_space.high_array(),
)EnvironmentAdapter._adapt_action method · python · L147-L163 (17 LOC)src/agent_sim_bridge/environment/adapter.py
def _adapt_action(self, action: NDArray[np.float32]) -> NDArray[np.float32]:
if self._action_strategy == AdaptationStrategy.IDENTITY:
return action
src_space = self._source.action_space
tgt_space = self._target_action_space
if self._action_strategy == AdaptationStrategy.CLIP:
return np.clip(action, src_space.low_array(), src_space.high_array()).astype(
np.float32
)
# SCALE: invert target→source
return _scale_array(
action,
tgt_space.low_array(),
tgt_space.high_array(),
src_space.low_array(),
src_space.high_array(),
)EnvironmentAdapter.reset method · python · L169-L176 (8 LOC)src/agent_sim_bridge/environment/adapter.py
def reset(
self,
*,
seed: int | None = None,
options: dict[str, object] | None = None,
) -> tuple[NDArray[np.float32], dict[str, object]]:
obs, info = self._source.reset(seed=seed, options=options)
return self._adapt_obs(obs), infoRepobility — same analyzer, your code, free for public repos · /scan/
EnvironmentAdapter.step method · python · L178-L188 (11 LOC)src/agent_sim_bridge/environment/adapter.py
def step(self, action: NDArray[np.float32]) -> StepResult:
adapted_action = self._adapt_action(action)
result = self._source.step(adapted_action)
adapted_obs = self._adapt_obs(result.observation)
return StepResult(
adapted_obs,
result.reward,
result.terminated,
result.truncated,
result.info,
)SpaceSpec.contains method · python · L52-L59 (8 LOC)src/agent_sim_bridge/environment/base.py
def contains(self, value: NDArray[np.float32]) -> bool:
"""Return True if *value* lies within the space bounds."""
arr = np.asarray(value, dtype=self.dtype)
if arr.shape != self.shape:
return False
return bool(
np.all(arr >= self.low_array()) and np.all(arr <= self.high_array())
)Environment.reset method · python · L152-L173 (22 LOC)src/agent_sim_bridge/environment/base.py
def reset(
self,
*,
seed: int | None = None,
options: dict[str, object] | None = None,
) -> tuple[NDArray[np.float32], dict[str, object]]:
"""Reset the environment to its initial state.
Parameters
----------
seed:
Optional RNG seed for reproducibility.
options:
Implementation-specific reset options.
Returns
-------
observation:
Initial state observation.
info:
Auxiliary info dict (may be empty).
"""Environment.step method · python · L176-L191 (16 LOC)src/agent_sim_bridge/environment/base.py
def step(
self,
action: NDArray[np.float32],
) -> StepResult:
"""Advance the environment by one timestep.
Parameters
----------
action:
Control input array matching :attr:`action_space`.
Returns
-------
StepResult
``(observation, reward, terminated, truncated, info)`` tuple.
"""Environment.act method · python · L201-L211 (11 LOC)src/agent_sim_bridge/environment/base.py
def act(self, action: NDArray[np.float32]) -> None:
"""Apply an action without computing reward or checking termination.
Some physical systems separate actuation from observation reads;
this method is provided as a lower-level primitive for those cases.
Parameters
----------
action:
Control input array matching :attr:`action_space`.
"""Environment.__exit__ method · python · L227-L233 (7 LOC)src/agent_sim_bridge/environment/base.py
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: object,
) -> None:
self.close()Environment.__repr__ method · python · L235-L240 (6 LOC)src/agent_sim_bridge/environment/base.py
def __repr__(self) -> str:
return (
f"{self.__class__.__name__}("
f"name={self.info.name!r}, "
f"is_simulation={self.info.is_simulation})"
)RealSystemInterface.system_reset method · python · L31-L36 (6 LOC)src/agent_sim_bridge/environment/real_env.py
def system_reset(
self,
options: dict[str, object] | None,
) -> NDArray[np.float32]:
"""Initialise / home the system and return the initial observation."""
...Repobility · open methodology · https://repobility.com/research/
RealSystemInterface.system_step method · python · L38-L43 (6 LOC)src/agent_sim_bridge/environment/real_env.py
def system_step(
self,
action: NDArray[np.float32],
) -> tuple[NDArray[np.float32], float, bool, bool, dict[str, object]]:
"""Send the action and return ``(obs, reward, terminated, truncated, info)``."""
...RealityEnvironment.__init__ method · python · L96-L118 (23 LOC)src/agent_sim_bridge/environment/real_env.py
def __init__(
self,
interface: RealSystemInterface,
name: str = "real-environment",
max_episode_steps: int = 500,
step_timeout_seconds: float | None = 5.0,
require_reset: bool = True,
) -> None:
if not isinstance(interface, RealSystemInterface):
raise TypeError(
f"interface must satisfy RealSystemInterface protocol, got {type(interface)!r}"
)
self._interface = interface
self._info = EnvironmentInfo(
name=name,
is_simulation=False,
max_episode_steps=max_episode_steps,
)
self._step_timeout = step_timeout_seconds
self._require_reset = require_reset
self._has_reset: bool = False
self._episode_step: int = 0
self._closed: bool = FalseRealityEnvironment.reset method · python · L144-L159 (16 LOC)src/agent_sim_bridge/environment/real_env.py
def reset(
self,
*,
seed: int | None = None,
options: dict[str, object] | None = None,
) -> tuple[NDArray[np.float32], dict[str, object]]:
self._ensure_open()
if seed is not None:
logger.warning(
"%s is a real environment; seed=%s is ignored.", self._info.name, seed
)
self._episode_step = 0
self._has_reset = True
obs = self._interface.system_reset(options)
logger.debug("%s reset", self._info.name)
return obs, {}page 1 / 5next ›