Function bodies 9 total
run_command function · python · L38-L79 (42 LOC)src/aumai_agentsim/cli.py
def run_command(config_path: str, output_path: str | None) -> None:
"""Run a simulation defined by a YAML config file.
\b
Config format:
max_steps: 5
agents:
- agent_id: a1
name: Alice
role: planner
capabilities: [plan, delegate]
- agent_id: a2
name: Bob
role: executor
capabilities: [execute]
"""
raw: dict[str, Any] = yaml.safe_load(Path(config_path).read_text())
max_steps: int = int(raw.get("max_steps", 10))
agents_data: list[dict[str, Any]] = raw.get("agents", [])
if not agents_data:
click.echo("Error: config must contain at least one agent.", err=True)
sys.exit(1)
agents = [SimAgent(**a) for a in agents_data]
simulator = AgentSimulator()
env = simulator.create_environment(agents, max_steps=max_steps)
result = simulator.run(env)
result_json = result.model_dump_json(indent=2)
if output_path:
Path(output_path).writeanalyze_command function · python · L90-L115 (26 LOC)src/aumai_agentsim/cli.py
def analyze_command(result_path: str) -> None:
"""Analyze a saved simulation result and print metrics."""
data = json.loads(Path(result_path).read_text())
result = SimResult.model_validate(data)
click.echo("=== Simulation Analysis ===")
click.echo(f"Steps completed : {result.environment.step_count}")
click.echo(f"Total messages : {len(result.message_log)}")
click.echo(f"Agents : {len(result.environment.agents)}")
click.echo("")
click.echo("Metrics:")
for key, value in result.metrics.items():
click.echo(f" {key}: {value}")
participation: dict[str, int] = result.metrics.get(
"agent_participation", {}
)
if participation:
click.echo("")
click.echo("Agent participation breakdown:")
for agent_id, count in sorted(
participation.items(), key=lambda kv: kv[1], reverse=True
):
agents_by_id = {a.agent_id: a.name for a in result.environment.agents}
naMessageBroker.send method · python · L29-L37 (9 LOC)src/aumai_agentsim/core.py
def send(self, message: SimMessage) -> None:
"""Deliver a message to its intended recipient(s)."""
if message.receiver == "broadcast":
for agent_id, queue in self._queues.items():
if agent_id != message.sender:
queue.append(message)
else:
if message.receiver in self._queues:
self._queues[message.receiver].append(message)MetricsCollector.record_messages method · python · L69-L75 (7 LOC)src/aumai_agentsim/core.py
def record_messages(self, messages: list[SimMessage]) -> None:
"""Accumulate message counts and agent participation."""
self._message_count += len(messages)
for msg in messages:
self._participation[msg.sender] = (
self._participation.get(msg.sender, 0) + 1
)MetricsCollector.collect method · python · L77-L89 (13 LOC)src/aumai_agentsim/core.py
def collect(self) -> dict[str, Any]:
"""Return a snapshot of collected metrics."""
avg_response_ms = (
(sum(self._response_times) / len(self._response_times)) * 1000
if self._response_times
else 0.0
)
return {
"total_messages": self._message_count,
"avg_step_response_ms": round(avg_response_ms, 3),
"agent_participation": dict(self._participation),
"steps_recorded": len(self._response_times),
}_agent_process function · python · L92-L111 (20 LOC)src/aumai_agentsim/core.py
def _agent_process(
agent: SimAgent, incoming: list[SimMessage]
) -> list[SimMessage]:
"""Deterministic agent logic: respond to each incoming request."""
outgoing: list[SimMessage] = []
for msg in incoming:
if msg.msg_type in (MessageType.request, MessageType.broadcast):
response_content = (
f"[{agent.name}:{agent.role}] ACK: {msg.content[:80]}"
)
outgoing.append(
SimMessage(
sender=agent.agent_id,
receiver=msg.sender,
content=response_content,
msg_type=MessageType.response,
timestamp=datetime.now(tz=timezone.utc),
)
)
return outgoingAgentSimulator.create_environment method · python · L135-L152 (18 LOC)src/aumai_agentsim/core.py
def create_environment(
self, agents: list[SimAgent], max_steps: int = 10
) -> SimEnvironment:
"""Create a fresh simulation environment."""
if not agents:
raise ValueError("At least one agent is required.")
env = SimEnvironment(agents=list(agents), max_steps=max_steps)
# Seed with an initial broadcast from the first agent
seed = SimMessage(
sender=agents[0].agent_id,
receiver="broadcast",
content="Simulation started. What are your capabilities?",
msg_type=MessageType.broadcast,
timestamp=datetime.now(tz=timezone.utc),
)
env.messages.append(seed)
self._cursors[id(env)] = 0
return envHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
AgentSimulator.step method · python · L154-L184 (31 LOC)src/aumai_agentsim/core.py
def step(self, env: SimEnvironment) -> list[SimMessage]:
"""
Advance the simulation by one step.
Each agent processes its pending incoming messages and produces
responses. New messages are appended to ``env.messages``.
"""
broker = MessageBroker()
for agent in env.agents:
broker.register(agent.agent_id)
# Determine the slice of messages not yet delivered
cursor = self._cursors.get(id(env), 0)
pending = env.messages[cursor:]
new_cursor = len(env.messages)
# Deliver only actionable message types
for msg in pending:
if msg.msg_type in (MessageType.request, MessageType.broadcast):
broker.send(msg)
step_messages: list[SimMessage] = []
for agent in env.agents:
incoming = broker.receive(agent.agent_id)
responses = _agent_process(agent, incoming)
step_messages.extend(responses)
env.mAgentSimulator.run method · python · L186-L208 (23 LOC)src/aumai_agentsim/core.py
def run(self, env: SimEnvironment) -> SimResult:
"""
Run the simulation to completion.
Stops when ``max_steps`` is reached or no messages are produced
in a step (convergence).
"""
metrics_collector = MetricsCollector()
while env.step_count < env.max_steps:
current_step = env.step_count
metrics_collector.record_step_start(current_step)
new_messages = self.step(env)
metrics_collector.record_step_end(current_step)
metrics_collector.record_messages(new_messages)
if not new_messages:
break
return SimResult(
environment=env,
message_log=list(env.messages),
metrics=metrics_collector.collect(),
)