Function bodies 210 total
TrajectoryRecorder.load method · python · L217-L256 (40 LOC)src/agent_sim_bridge/simulation/recorder.py
def load(cls, path: str | Path) -> "TrajectoryRecorder":
"""Load a trajectory previously saved with :meth:`save`.
Parameters
----------
path:
Path to the ``.npz`` file.
Returns
-------
TrajectoryRecorder
A new recorder containing the loaded steps.
"""
source = Path(path)
if source.suffix != ".npz":
source = source.with_suffix(".npz")
data = np.load(source)
recorder = cls()
if "observations" not in data:
logger.warning("Loaded empty trajectory from %s", source)
return recorder
n_steps = int(data["observations"].shape[0])
for i in range(n_steps):
step = TrajectoryStep(
step_index=i,
observation=data["observations"][i].astype(np.float32),
action=data["actions"][i].astype(np.float32),
reward=float(data["rewards"][i]),
TrajectoryRecorder.to_bytes method · python · L258-L283 (26 LOC)src/agent_sim_bridge/simulation/recorder.py
def to_bytes(self) -> bytes:
"""Serialise to an in-memory bytes buffer (useful for streaming)."""
buffer = io.BytesIO()
arrays: dict[str, NDArray[np.float32]] = {}
if self._steps:
arrays["observations"] = np.stack(
[s.observation for s in self._steps], axis=0
)
arrays["actions"] = np.stack([s.action for s in self._steps], axis=0)
arrays["rewards"] = np.array(
[s.reward for s in self._steps], dtype=np.float32
)
arrays["next_observations"] = np.stack(
[s.next_observation for s in self._steps], axis=0
)
arrays["terminated"] = np.array(
[s.terminated for s in self._steps], dtype=bool
)
arrays["truncated"] = np.array(
[s.truncated for s in self._steps], dtype=bool
)
arrays["timestamps"] = np.array(
[s.timestamp for s inReplayResult.summary method · python · L48-L55 (8 LOC)src/agent_sim_bridge/simulation/replay.py
def summary(self) -> dict[str, float]:
"""Return key metrics as a plain dict."""
return {
"observation_mse": self.observation_mse,
"max_obs_divergence": self.max_obs_divergence,
"reward_correlation": self.reward_correlation,
"n_steps": float(len(self.original_steps)),
}TrajectoryReplay.__init__ method · python · L79-L85 (7 LOC)src/agent_sim_bridge/simulation/replay.py
def __init__(
self,
environment: Environment,
reset_seed: int | None = None,
) -> None:
self._env = environment
self._reset_seed = reset_seedTrajectoryReplay.replay method · python · L87-L166 (80 LOC)src/agent_sim_bridge/simulation/replay.py
def replay(
self,
recorder: TrajectoryRecorder,
stop_on_termination: bool = True,
) -> ReplayResult:
"""Execute all recorded actions in the environment and collect results.
Parameters
----------
recorder:
The trajectory to replay. Must contain at least one step.
stop_on_termination:
If True (default), replay stops early when the environment
signals termination, even if there are remaining steps.
Returns
-------
ReplayResult
Observations, divergence metrics, and reward statistics.
"""
steps = recorder.steps
if not steps:
logger.warning("TrajectoryReplay called with empty recorder.")
return ReplayResult(original_steps=[], replayed_observations=[])
obs, _ = self._env.reset(seed=self._reset_seed)
replayed_observations: list[NDArray[np.float32]] = []
replayed_rewards: list[flExecutionResult.summary method · python · L79-L89 (11 LOC)src/agent_sim_bridge/simulation/sandbox.py
def summary(self) -> dict[str, object]:
"""Return a plain-dict summary of key metrics."""
return {
"total_reward": self.total_reward,
"steps": self.steps,
"terminated": self.terminated,
"truncated": self.truncated,
"wall_time_seconds": self.wall_time_seconds,
"success": self.success,
"error": self.error,
}_timeout_context function · python · L93-L120 (28 LOC)src/agent_sim_bridge/simulation/sandbox.py
def _timeout_context(seconds: float) -> "Iterator[None]": # type: ignore[type-arg]
"""Context manager that raises SandboxTimeoutError after *seconds*.
Uses ``signal.SIGALRM`` on POSIX systems. On Windows (no SIGALRM),
the timeout is enforced only via manual wall-clock checks between steps.
"""
import sys
if seconds <= 0:
yield
return
if sys.platform != "win32":
def _handler(signum: int, frame: object) -> None: # noqa: ARG001
raise SandboxTimeoutError(
f"Simulation episode exceeded wall-clock timeout of {seconds:.1f}s."
)
old_handler = signal.signal(signal.SIGALRM, _handler)
signal.setitimer(signal.ITIMER_REAL, seconds)
try:
yield
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
signal.signal(signal.SIGALRM, old_handler)
else:
# On Windows fall through; per-step wall-clock check is the guard.
yieldProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
SimulationSandbox.__init__ method · python · L155-L167 (13 LOC)src/agent_sim_bridge/simulation/sandbox.py
def __init__(
self,
environment: Environment,
max_steps: int = 1000,
timeout_seconds: float | None = 60.0,
record: bool = False,
reset_seed: int | None = None,
) -> None:
self._env = environment
self._max_steps = max_steps
self._timeout = timeout_seconds or 0.0
self._record = record
self._reset_seed = reset_seedSimulationSandbox.run method · python · L169-L245 (77 LOC)src/agent_sim_bridge/simulation/sandbox.py
def run(self, policy: PolicyCallable) -> ExecutionResult:
"""Execute a full episode using *policy* to select actions.
Parameters
----------
policy:
Callable ``(observation) -> action``.
Returns
-------
ExecutionResult
Episode statistics and optional trajectory recorder.
"""
recorder = TrajectoryRecorder() if self._record else None
result = ExecutionResult(recorder=recorder)
t_start = time.monotonic()
try:
with _timeout_context(self._timeout):
obs, _ = self._env.reset(seed=self._reset_seed)
total_reward = 0.0
steps = 0
for _ in range(self._max_steps):
# Per-step wall-clock guard (especially for Windows)
if self._timeout > 0.0:
elapsed = time.monotonic() - t_start
if elapsed > self._timeout:
Scenario.to_reset_options method · python · L77-L83 (7 LOC)src/agent_sim_bridge/simulation/scenario.py
def to_reset_options(self) -> dict[str, object]:
"""Merge initial_state and env_parameters into a reset-options dict."""
options: dict[str, object] = {}
if self.initial_state:
options["initial_state"] = dict(self.initial_state)
options.update(self.env_parameters)
return optionsScenario.evaluate method · python · L85-L103 (19 LOC)src/agent_sim_bridge/simulation/scenario.py
def evaluate(self, total_reward: float, steps: int, terminated: bool) -> bool:
"""Return True if the episode outcome satisfies this scenario's criteria.
Parameters
----------
total_reward:
Cumulative reward achieved in the episode.
steps:
Number of steps taken.
terminated:
Whether the episode ended in a terminal state.
"""
if self.outcome.min_reward is not None and total_reward < self.outcome.min_reward:
return False
if self.outcome.max_steps is not None and steps > self.outcome.max_steps:
return False
if self.outcome.require_termination and not terminated:
return False
return TrueScenarioManager.save method · python · L127-L147 (21 LOC)src/agent_sim_bridge/simulation/scenario.py
def save(self, scenario: Scenario) -> Path:
"""Write *scenario* to ``<directory>/<name>.yaml``.
Parameters
----------
scenario:
The scenario to persist.
Returns
-------
Path
The file path the scenario was written to.
"""
self._directory.mkdir(parents=True, exist_ok=True)
filename = self._name_to_filename(scenario.name)
path = self._directory / filename
data = scenario.model_dump()
with path.open("w", encoding="utf-8") as f:
yaml.safe_dump(data, f, default_flow_style=False, allow_unicode=True)
logger.info("Saved scenario %r to %s", scenario.name, path)
return pathScenarioManager.load method · python · L149-L177 (29 LOC)src/agent_sim_bridge/simulation/scenario.py
def load(self, name: str) -> Scenario:
"""Load a scenario by name.
Parameters
----------
name:
The scenario's ``name`` field (not the file name).
Returns
-------
Scenario
Raises
------
FileNotFoundError
If no file exists for the given name.
"""
filename = self._name_to_filename(name)
path = self._directory / filename
if not path.exists():
raise FileNotFoundError(
f"Scenario {name!r} not found at {path}. "
f"Available scenarios: {self.list_names()}"
)
with path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
scenario = Scenario.model_validate(data)
logger.debug("Loaded scenario %r from %s", name, path)
return scenarioScenarioManager.delete method · python · L179-L197 (19 LOC)src/agent_sim_bridge/simulation/scenario.py
def delete(self, name: str) -> None:
"""Remove a scenario file.
Parameters
----------
name:
Scenario name.
Raises
------
FileNotFoundError
If the scenario does not exist.
"""
filename = self._name_to_filename(name)
path = self._directory / filename
if not path.exists():
raise FileNotFoundError(f"Scenario {name!r} not found at {path}.")
path.unlink()
logger.info("Deleted scenario %r (%s)", name, path)ScenarioManager.list_names method · python · L203-L215 (13 LOC)src/agent_sim_bridge/simulation/scenario.py
def list_names(self) -> list[str]:
"""Return sorted list of all scenario names on disk."""
if not self._directory.exists():
return []
names: list[str] = []
for yaml_path in sorted(self._directory.glob("*.yaml")):
try:
with yaml_path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f)
names.append(str(data.get("name", yaml_path.stem)))
except Exception: # noqa: BLE001
logger.warning("Could not parse scenario file %s", yaml_path)
return namesRepobility · code-quality intelligence · https://repobility.com
SkillResult.summary method · python · L65-L74 (10 LOC)src/agent_sim_bridge/skills/base.py
def summary(self) -> dict[str, object]:
"""Return a plain-dict summary."""
return {
"skill_name": self.skill_name,
"status": self.status.value,
"total_reward": self.total_reward,
"steps_taken": self.steps_taken,
"succeeded": self.succeeded,
"error": self.error,
}Skill.execute method · python · L106-L126 (21 LOC)src/agent_sim_bridge/skills/base.py
def execute(
self,
env: object,
context: dict[str, object] | None = None,
) -> SkillResult:
"""Run the skill in ``env``.
Parameters
----------
env:
The environment to act in. The skill calls ``env.step(action)``
internally and must not call ``env.reset()``.
context:
Optional key/value context from the caller (e.g., goal position,
prior skill results).
Returns
-------
SkillResult
The execution outcome.
"""CompositeSkill.__init__ method · python · L40-L51 (12 LOC)src/agent_sim_bridge/skills/composer.py
def __init__(
self,
name: str,
skills: list[Skill],
continue_on_failure: bool = False,
max_steps: int | None = None,
) -> None:
super().__init__(name=name, max_steps=max_steps)
if not skills:
raise ValueError(f"CompositeSkill {name!r} must contain at least one sub-skill.")
self._skills = list(skills)
self._continue_on_failure = continue_on_failureCompositeSkill.execute method · python · L58-L140 (83 LOC)src/agent_sim_bridge/skills/composer.py
def execute(
self,
env: object,
context: dict[str, object] | None = None,
) -> SkillResult:
"""Execute all sub-skills sequentially.
Parameters
----------
env:
The environment shared across all sub-skills.
context:
Optional context passed to each sub-skill. Sub-skill results
are merged into a copy of the context so later skills can read
earlier results under the key ``"<skill_name>_result"``.
Returns
-------
SkillResult
Aggregated result with cumulative reward and step count.
"""
running_context: dict[str, object] = dict(context or {})
total_reward = 0.0
total_steps = 0
sub_results: list[SkillResult] = []
final_status = SkillStatus.SUCCESS
error_message: str | None = None
for skill in self._skills:
# Enforce composite step budget.
if self._maxCompositeSkill.__repr__ method · python · L142-L147 (6 LOC)src/agent_sim_bridge/skills/composer.py
def __repr__(self) -> str:
return (
f"CompositeSkill(name={self._name!r}, "
f"n_skills={len(self._skills)}, "
f"continue_on_failure={self._continue_on_failure})"
)SkillComposer.compose method · python · L168-L203 (36 LOC)src/agent_sim_bridge/skills/composer.py
def compose(
self,
name: str,
skills: list[Skill],
continue_on_failure: bool = False,
max_steps: int | None = None,
) -> CompositeSkill:
"""Create a :class:`CompositeSkill` from a list of sub-skills.
Parameters
----------
name:
Name for the new composite skill.
skills:
Ordered sub-skills to chain.
continue_on_failure:
When True, all sub-skills run regardless of individual failures.
max_steps:
Combined step budget.
Returns
-------
CompositeSkill
"""
composite = CompositeSkill(
name=name,
skills=skills,
continue_on_failure=continue_on_failure,
max_steps=max_steps,
)
logger.debug(
"Composed skill %r from %d sub-skills.",
name,
len(skills),
)
return compositeSkillLibrary.register method · python · L49-L68 (20 LOC)src/agent_sim_bridge/skills/library.py
def register(self, skill: Skill) -> None:
"""Add a skill to the library.
Parameters
----------
skill:
The :class:`~agent_sim_bridge.skills.base.Skill` instance to register.
Raises
------
ValueError
If a skill with the same name is already registered.
"""
if skill.name in self._skills:
raise ValueError(
f"Skill {skill.name!r} is already in the library. "
"Use a unique name or remove the existing skill first."
)
self._skills[skill.name] = skill
logger.debug("Registered skill %r (%s).", skill.name, type(skill).__name__)SkillLibrary.register_or_replace method · python · L70-L83 (14 LOC)src/agent_sim_bridge/skills/library.py
def register_or_replace(self, skill: Skill) -> None:
"""Register a skill, silently replacing any existing skill with the same name.
Parameters
----------
skill:
The :class:`~agent_sim_bridge.skills.base.Skill` instance to register.
"""
replaced = skill.name in self._skills
self._skills[skill.name] = skill
if replaced:
logger.debug("Replaced existing skill %r.", skill.name)
else:
logger.debug("Registered skill %r (%s).", skill.name, type(skill).__name__)Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
SkillLibrary.remove method · python · L85-L101 (17 LOC)src/agent_sim_bridge/skills/library.py
def remove(self, name: str) -> None:
"""Remove a skill from the library by name.
Parameters
----------
name:
The skill name.
Raises
------
SkillNotFoundError
If no skill with that name exists.
"""
if name not in self._skills:
raise SkillNotFoundError(name)
del self._skills[name]
logger.debug("Removed skill %r from library.", name)SkillLibrary.get method · python · L103-L123 (21 LOC)src/agent_sim_bridge/skills/library.py
def get(self, name: str) -> Skill:
"""Retrieve a skill by exact name.
Parameters
----------
name:
The skill name used at registration.
Returns
-------
Skill
Raises
------
SkillNotFoundError
If no skill with that name exists.
"""
try:
return self._skills[name]
except KeyError:
raise SkillNotFoundError(name) from NoneSkillLibrary.search method · python · L125-L149 (25 LOC)src/agent_sim_bridge/skills/library.py
def search(self, query: str) -> list[Skill]:
"""Find skills whose name or tags contain ``query`` (case-insensitive).
Parameters
----------
query:
Substring to search for. An empty string returns all skills.
Returns
-------
list[Skill]
Matching skills, sorted alphabetically by name.
"""
lower_query = query.lower()
results: list[Skill] = []
for skill in self._skills.values():
if lower_query in skill.name.lower():
results.append(skill)
continue
# Check tags if the skill exposes them.
tags: list[str] = getattr(skill, "tags", [])
if any(lower_query in tag.lower() for tag in tags):
results.append(skill)
results.sort(key=lambda s: s.name)
return resultsSkillLibrary.list_all method · python · L151-L158 (8 LOC)src/agent_sim_bridge/skills/library.py
def list_all(self) -> list[Skill]:
"""Return all registered skills sorted alphabetically by name.
Returns
-------
list[Skill]
"""
return sorted(self._skills.values(), key=lambda s: s.name)ChaosConfig.__post_init__ method · python · L58-L65 (8 LOC)src/agent_sim_bridge/staging/chaos.py
def __post_init__(self) -> None:
self._validate_probability("network_partition_probability", self.network_partition_probability)
self._validate_probability("model_degradation_rate", self.model_degradation_rate)
self._validate_probability("tool_failure_rate", self.tool_failure_rate)
if self.latency_injection_ms < 0:
raise ValueError(
f"latency_injection_ms must be >= 0, got {self.latency_injection_ms}."
)ChaosEngine.__init__ method · python · L92-L97 (6 LOC)src/agent_sim_bridge/staging/chaos.py
def __init__(self, config: ChaosConfig) -> None:
self._config = config
self._rng = random.Random(config.random_seed)
self._partition_count: int = 0
self._latency_total_ms: float = 0.0
self._degradation_count: int = 0ChaosEngine.should_partition method · python · L119-L136 (18 LOC)src/agent_sim_bridge/staging/chaos.py
def should_partition(self) -> bool:
"""Return True if this interaction should be dropped as a partition.
Uses the configured :attr:`ChaosConfig.network_partition_probability`.
Returns
-------
bool
``True`` means the call should be treated as if the network
is unavailable.
"""
if self._config.network_partition_probability <= 0.0:
return False
result = self._rng.random() < self._config.network_partition_probability
if result:
self._partition_count += 1
logger.debug("ChaosEngine: network partition injected.")
return resultChaosEngine.inject_latency method · python · L138-L154 (17 LOC)src/agent_sim_bridge/staging/chaos.py
def inject_latency(self) -> float:
"""Return the configured latency injection amount in milliseconds.
This is a deterministic value (not randomised) — the full
``latency_injection_ms`` is always returned when non-zero. For
random latency, wrap this in the caller.
Returns
-------
float
Latency in milliseconds to add (>= 0.0).
"""
latency = self._config.latency_injection_ms
if latency > 0.0:
self._latency_total_ms += latency
logger.debug("ChaosEngine: injecting %.2f ms latency.", latency)
return latencyRepobility analyzer · published findings · https://repobility.com
ChaosEngine.degrade_output method · python · L156-L186 (31 LOC)src/agent_sim_bridge/staging/chaos.py
def degrade_output(self, output: str) -> str:
"""Degrade *output* according to the configured degradation rate.
Degradation is simulated by truncating the string: a rate of 0.5
removes the last 50% of characters; a rate of 1.0 returns an
empty string. No random noise is added so that results are
deterministic for a given input.
Parameters
----------
output:
Original model output string.
Returns
-------
str
Degraded output string.
"""
rate = self._config.model_degradation_rate
if rate <= 0.0:
return output
keep_fraction = 1.0 - rate
keep_chars = max(0, int(len(output) * keep_fraction))
degraded = output[:keep_chars]
self._degradation_count += 1
logger.debug(
"ChaosEngine: degraded output from %d to %d chars.",
len(output),
len(degraded),
)
retChaosEngine.should_fail_tool method · python · L188-L200 (13 LOC)src/agent_sim_bridge/staging/chaos.py
def should_fail_tool(self) -> bool:
"""Return True if a simulated tool call should fail.
Uses the configured :attr:`ChaosConfig.tool_failure_rate`.
Returns
-------
bool
``True`` means the tool should raise an error.
"""
if self._config.tool_failure_rate <= 0.0:
return False
return self._rng.random() < self._config.tool_failure_rateChaosEngine.__repr__ method · python · L208-L215 (8 LOC)src/agent_sim_bridge/staging/chaos.py
def __repr__(self) -> str:
return (
f"ChaosEngine("
f"partition_prob={self._config.network_partition_probability}, "
f"latency_ms={self._config.latency_injection_ms}, "
f"degradation_rate={self._config.model_degradation_rate}"
f")"
)StagingReport.__post_init__ method · python · L65-L72 (8 LOC)src/agent_sim_bridge/staging/environment.py
def __post_init__(self) -> None:
if self.total_scenarios < 0:
raise ValueError("total_scenarios must be >= 0.")
if self.passed + self.failed > self.total_scenarios:
raise ValueError(
f"passed ({self.passed}) + failed ({self.failed}) "
f"exceeds total_scenarios ({self.total_scenarios})."
)StagingEnvironment.__init__ method · python · L115-L124 (10 LOC)src/agent_sim_bridge/staging/environment.py
def __init__(
self,
chaos_config: ChaosConfig | None = None,
pass_threshold: float = 0.5,
) -> None:
self._chaos_config = chaos_config or ChaosConfig()
self._chaos_engine = ChaosEngine(self._chaos_config)
self._pass_threshold = pass_threshold
self._users: list[SimulatedUser] = []
self._tools: dict[str, SimulatedTool] = {}StagingEnvironment.add_user method · python · L130-L144 (15 LOC)src/agent_sim_bridge/staging/environment.py
def add_user(self, profile: UserProfile) -> None:
"""Register a simulated user in this staging environment.
Parameters
----------
profile:
The user profile to add.
"""
user = SimulatedUser(profile)
self._users.append(user)
logger.debug(
"StagingEnvironment: added user %r (persona=%r).",
profile.name,
profile.persona,
)StagingEnvironment.add_tool method · python · L146-L164 (19 LOC)src/agent_sim_bridge/staging/environment.py
def add_tool(self, behavior: ToolBehavior) -> None:
"""Register a simulated tool in this staging environment.
Parameters
----------
behavior:
The tool behaviour to add. The tool is keyed by
``behavior.name``. Adding a tool with a duplicate name
replaces the previous one.
"""
self._tools[behavior.name] = SimulatedTool(
behavior,
random_seed=self._chaos_config.random_seed,
)
logger.debug(
"StagingEnvironment: added tool %r (success_rate=%.2f).",
behavior.name,
behavior.success_rate,
)StagingEnvironment.run_scenario method · python · L180-L311 (132 LOC)src/agent_sim_bridge/staging/environment.py
async def run_scenario(
self,
agent_fn: Callable[...],
scenario_name: str,
) -> StagingTestResult:
"""Execute *agent_fn* against all registered users and record results.
The agent function is called with each user's messages in sequence.
After each agent response the user's satisfaction is recorded.
Chaos events are applied per-call.
The agent function signature must be::
async def agent_fn(
message: str,
tools: dict[str, SimulatedTool],
context: dict[str, object],
) -> str: ...
or a synchronous equivalent (detected automatically).
Parameters
----------
agent_fn:
The agent function to test.
scenario_name:
Human-readable label for this scenario run.
Returns
-------
StagingTestResult
Immutable result record for this scenario.
"""
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
StagingEnvironment.aggregate_results method · python · L317-L364 (48 LOC)src/agent_sim_bridge/staging/environment.py
def aggregate_results(
self,
results: list[StagingTestResult],
) -> StagingReport:
"""Aggregate a list of :class:`StagingTestResult` into a report.
Parameters
----------
results:
List of scenario results to aggregate.
Returns
-------
StagingReport
Immutable aggregate report.
"""
total = len(results)
passed = sum(1 for r in results if r.passed)
failed = total - passed
avg_satisfaction = (
sum(r.user_satisfaction for r in results) / total
if total > 0
else 0.0
)
avg_latency = (
sum(r.duration_ms for r in results) / total
if total > 0
else 0.0
)
chaos_total = sum(r.chaos_event_count for r in results)
report = StagingReport(
total_scenarios=total,
passed=passed,
failed=failed,
average_saStagingEnvironment.__repr__ method · python · L366-L373 (8 LOC)src/agent_sim_bridge/staging/environment.py
def __repr__(self) -> str:
return (
f"StagingEnvironment("
f"users={len(self._users)}, "
f"tools={len(self._tools)}, "
f"chaos={self._chaos_config!r}"
f")"
)StagingTestResult.__post_init__ method · python · L49-L59 (11 LOC)src/agent_sim_bridge/staging/results.py
def __post_init__(self) -> None:
if not self.test_name:
raise ValueError("StagingTestResult.test_name must not be empty.")
if self.duration_ms < 0:
raise ValueError(
f"duration_ms must be >= 0, got {self.duration_ms}."
)
if not (0.0 <= self.user_satisfaction <= 1.0):
raise ValueError(
f"user_satisfaction must be in [0.0, 1.0], got {self.user_satisfaction}."
)ToolBehavior.__post_init__ method · python · L50-L60 (11 LOC)src/agent_sim_bridge/staging/simulated_tools.py
def __post_init__(self) -> None:
if not self.name:
raise ValueError("ToolBehavior.name must not be empty.")
if not (0.0 <= self.success_rate <= 1.0):
raise ValueError(
f"success_rate must be in [0.0, 1.0], got {self.success_rate}."
)
if self.latency_ms < 0:
raise ValueError(
f"latency_ms must be >= 0, got {self.latency_ms}."
)SimulatedTool.__init__ method · python · L84-L93 (10 LOC)src/agent_sim_bridge/staging/simulated_tools.py
def __init__(
self,
behavior: ToolBehavior,
random_seed: int | None = None,
) -> None:
self._behavior = behavior
self._rng = random.Random(random_seed)
self._call_count: int = 0
self._success_count: int = 0
self._failure_count: int = 0SimulatedTool.execute method · python · L120-L174 (55 LOC)src/agent_sim_bridge/staging/simulated_tools.py
async def execute(self, **kwargs: object) -> dict[str, object]:
"""Execute the simulated tool.
Simulates latency via ``asyncio.sleep`` then either returns a
success payload or raises :class:`SimulatedToolError`.
Parameters
----------
**kwargs:
Arbitrary tool input parameters (passed through to the
success payload for traceability).
Returns
-------
dict[str, object]
Success payload containing ``"tool"``, ``"status"``,
``"call_count"``, and ``"inputs"`` keys.
Raises
------
SimulatedToolError
When the random draw exceeds the configured success rate.
"""
self._call_count += 1
call_number = self._call_count
latency_seconds = self._behavior.latency_ms / 1000.0
if latency_seconds > 0:
await asyncio.sleep(latency_seconds)
if self._rng.random() > self._behavior.success_ratSimulatedTool.__repr__ method · python · L182-L189 (8 LOC)src/agent_sim_bridge/staging/simulated_tools.py
def __repr__(self) -> str:
return (
f"SimulatedTool("
f"name={self._behavior.name!r}, "
f"success_rate={self._behavior.success_rate}, "
f"calls={self._call_count}"
f")"
)SimulatedToolError.__init__ method · python · L205-L214 (10 LOC)src/agent_sim_bridge/staging/simulated_tools.py
def __init__(
self,
tool_name: str,
message: str,
call_number: int,
) -> None:
self.tool_name = tool_name
self.message = message
self.call_number = call_number
super().__init__(f"[{tool_name}] call #{call_number}: {message}")Repobility · code-quality intelligence · https://repobility.com
SimulatedUser.next_message method · python · L107-L129 (23 LOC)src/agent_sim_bridge/staging/simulated_users.py
def next_message(self) -> str | None:
"""Return the next message in sequence, or ``None`` when exhausted.
Advances the internal cursor on each call.
Returns
-------
str | None
The next user message, or ``None`` if all messages have
been sent.
"""
if self._cursor >= len(self._profile.messages):
return None
message = self._profile.messages[self._cursor]
self._cursor += 1
logger.debug(
"SimulatedUser[%s]: sending message %d/%d: %r",
self._profile.name,
self._cursor,
len(self._profile.messages),
message,
)
return messageSimulatedUser.evaluate_response method · python · L131-L167 (37 LOC)src/agent_sim_bridge/staging/simulated_users.py
def evaluate_response(self, response: str) -> float:
"""Score *response* against the user's expected outcomes.
Computes a satisfaction score in [0.0, 1.0] by counting how many
``expected_outcomes`` keywords appear in *response* (case-insensitive).
If no expected outcomes are defined, returns 1.0 (fully satisfied).
Parameters
----------
response:
The agent's textual response to evaluate.
Returns
-------
float
Satisfaction score: 1.0 = all outcomes found, 0.0 = none found.
"""
expected = self._profile.expected_outcomes
if not expected:
score = 1.0
else:
response_lower = response.lower()
matched = sum(
1
for outcome in expected
if outcome.lower() in response_lower
)
score = matched / len(expected)
self._satisfaction_scores.append(scoSimulatedUser.average_satisfaction method · python · L169-L181 (13 LOC)src/agent_sim_bridge/staging/simulated_users.py
def average_satisfaction(self) -> float:
"""Return the mean satisfaction score across all evaluated responses.
Returns 0.0 if no responses have been evaluated.
Returns
-------
float
Average satisfaction score in [0.0, 1.0].
"""
if not self._satisfaction_scores:
return 0.0
return sum(self._satisfaction_scores) / len(self._satisfaction_scores)