← back to invincible-jha__aumai-agentsim

Function bodies 9 total

All specs Real LLM only Function bodies
run_command function · python · L38-L79 (42 LOC)
src/aumai_agentsim/cli.py
def run_command(config_path: str, output_path: str | None) -> None:
    """Run a simulation defined by a YAML config file.

    \b
    Config format:
      max_steps: 5
      agents:
        - agent_id: a1
          name: Alice
          role: planner
          capabilities: [plan, delegate]
        - agent_id: a2
          name: Bob
          role: executor
          capabilities: [execute]
    """
    raw: dict[str, Any] = yaml.safe_load(Path(config_path).read_text())
    max_steps: int = int(raw.get("max_steps", 10))
    agents_data: list[dict[str, Any]] = raw.get("agents", [])

    if not agents_data:
        click.echo("Error: config must contain at least one agent.", err=True)
        sys.exit(1)

    agents = [SimAgent(**a) for a in agents_data]
    simulator = AgentSimulator()
    env = simulator.create_environment(agents, max_steps=max_steps)
    result = simulator.run(env)

    result_json = result.model_dump_json(indent=2)

    if output_path:
        Path(output_path).write
analyze_command function · python · L90-L115 (26 LOC)
src/aumai_agentsim/cli.py
def analyze_command(result_path: str) -> None:
    """Analyze a saved simulation result and print metrics."""
    data = json.loads(Path(result_path).read_text())
    result = SimResult.model_validate(data)

    click.echo("=== Simulation Analysis ===")
    click.echo(f"Steps completed : {result.environment.step_count}")
    click.echo(f"Total messages  : {len(result.message_log)}")
    click.echo(f"Agents          : {len(result.environment.agents)}")
    click.echo("")
    click.echo("Metrics:")
    for key, value in result.metrics.items():
        click.echo(f"  {key}: {value}")

    participation: dict[str, int] = result.metrics.get(
        "agent_participation", {}
    )
    if participation:
        click.echo("")
        click.echo("Agent participation breakdown:")
        for agent_id, count in sorted(
            participation.items(), key=lambda kv: kv[1], reverse=True
        ):
            agents_by_id = {a.agent_id: a.name for a in result.environment.agents}
            na
MessageBroker.send method · python · L29-L37 (9 LOC)
src/aumai_agentsim/core.py
    def send(self, message: SimMessage) -> None:
        """Deliver a message to its intended recipient(s)."""
        if message.receiver == "broadcast":
            for agent_id, queue in self._queues.items():
                if agent_id != message.sender:
                    queue.append(message)
        else:
            if message.receiver in self._queues:
                self._queues[message.receiver].append(message)
MetricsCollector.record_messages method · python · L69-L75 (7 LOC)
src/aumai_agentsim/core.py
    def record_messages(self, messages: list[SimMessage]) -> None:
        """Accumulate message counts and agent participation."""
        self._message_count += len(messages)
        for msg in messages:
            self._participation[msg.sender] = (
                self._participation.get(msg.sender, 0) + 1
            )
MetricsCollector.collect method · python · L77-L89 (13 LOC)
src/aumai_agentsim/core.py
    def collect(self) -> dict[str, Any]:
        """Return a snapshot of collected metrics."""
        avg_response_ms = (
            (sum(self._response_times) / len(self._response_times)) * 1000
            if self._response_times
            else 0.0
        )
        return {
            "total_messages": self._message_count,
            "avg_step_response_ms": round(avg_response_ms, 3),
            "agent_participation": dict(self._participation),
            "steps_recorded": len(self._response_times),
        }
_agent_process function · python · L92-L111 (20 LOC)
src/aumai_agentsim/core.py
def _agent_process(
    agent: SimAgent, incoming: list[SimMessage]
) -> list[SimMessage]:
    """Deterministic agent logic: respond to each incoming request."""
    outgoing: list[SimMessage] = []
    for msg in incoming:
        if msg.msg_type in (MessageType.request, MessageType.broadcast):
            response_content = (
                f"[{agent.name}:{agent.role}] ACK: {msg.content[:80]}"
            )
            outgoing.append(
                SimMessage(
                    sender=agent.agent_id,
                    receiver=msg.sender,
                    content=response_content,
                    msg_type=MessageType.response,
                    timestamp=datetime.now(tz=timezone.utc),
                )
            )
    return outgoing
AgentSimulator.create_environment method · python · L135-L152 (18 LOC)
src/aumai_agentsim/core.py
    def create_environment(
        self, agents: list[SimAgent], max_steps: int = 10
    ) -> SimEnvironment:
        """Create a fresh simulation environment."""
        if not agents:
            raise ValueError("At least one agent is required.")
        env = SimEnvironment(agents=list(agents), max_steps=max_steps)
        # Seed with an initial broadcast from the first agent
        seed = SimMessage(
            sender=agents[0].agent_id,
            receiver="broadcast",
            content="Simulation started. What are your capabilities?",
            msg_type=MessageType.broadcast,
            timestamp=datetime.now(tz=timezone.utc),
        )
        env.messages.append(seed)
        self._cursors[id(env)] = 0
        return env
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
AgentSimulator.step method · python · L154-L184 (31 LOC)
src/aumai_agentsim/core.py
    def step(self, env: SimEnvironment) -> list[SimMessage]:
        """
        Advance the simulation by one step.

        Each agent processes its pending incoming messages and produces
        responses.  New messages are appended to ``env.messages``.
        """
        broker = MessageBroker()
        for agent in env.agents:
            broker.register(agent.agent_id)

        # Determine the slice of messages not yet delivered
        cursor = self._cursors.get(id(env), 0)
        pending = env.messages[cursor:]
        new_cursor = len(env.messages)

        # Deliver only actionable message types
        for msg in pending:
            if msg.msg_type in (MessageType.request, MessageType.broadcast):
                broker.send(msg)

        step_messages: list[SimMessage] = []
        for agent in env.agents:
            incoming = broker.receive(agent.agent_id)
            responses = _agent_process(agent, incoming)
            step_messages.extend(responses)

        env.m
AgentSimulator.run method · python · L186-L208 (23 LOC)
src/aumai_agentsim/core.py
    def run(self, env: SimEnvironment) -> SimResult:
        """
        Run the simulation to completion.

        Stops when ``max_steps`` is reached or no messages are produced
        in a step (convergence).
        """
        metrics_collector = MetricsCollector()

        while env.step_count < env.max_steps:
            current_step = env.step_count
            metrics_collector.record_step_start(current_step)
            new_messages = self.step(env)
            metrics_collector.record_step_end(current_step)
            metrics_collector.record_messages(new_messages)
            if not new_messages:
                break

        return SimResult(
            environment=env,
            message_log=list(env.messages),
            metrics=metrics_collector.collect(),
        )