← back to dream77r__my-claude-bot

Function bodies 185 total

All specs Real LLM only Function bodies
print_header function · python · L36-L41 (6 LOC)
setup.py
def print_header():
    print()
    print(bold("=" * 50))
    print(bold("  Agent Fleet Platform — Настройка"))
    print(bold("=" * 50))
    print()
check_dependencies function · python · L44-L63 (20 LOC)
setup.py
def check_dependencies():
    """Проверить и установить зависимости."""
    try:
        import yaml
        import dotenv
        import telegram
        import claude_agent_sdk
        print(green("  Зависимости установлены"))
        return True
    except ImportError:
        print(yellow("  Устанавливаю зависимости..."))
        result = subprocess.run(
            [sys.executable, "-m", "pip", "install", "-r", str(ROOT / "requirements.txt")],
            capture_output=True, text=True
        )
        if result.returncode != 0:
            print(red(f"  Ошибка установки: {result.stderr[:200]}"))
            return False
        print(green("  Зависимости установлены"))
        return True
check_claude_cli function · python · L66-L74 (9 LOC)
setup.py
def check_claude_cli():
    """Проверить что Claude CLI доступен."""
    result = subprocess.run(["which", "claude"], capture_output=True, text=True)
    if result.returncode != 0:
        print(red("  Claude CLI не найден!"))
        print("  Установи: https://docs.anthropic.com/en/docs/claude-code")
        return False
    print(green("  Claude CLI найден"))
    return True
ask_bot_token function · python · L77-L93 (17 LOC)
setup.py
def ask_bot_token() -> str:
    """Спросить токен бота."""
    print()
    print(bold("Шаг 1: Токен Telegram-бота"))
    print()
    print("  Если у тебя ещё нет бота:")
    print("  1. Открой Telegram, найди @BotFather")
    print("  2. Отправь /newbot")
    print("  3. Придумай имя и username")
    print("  4. Скопируй токен (вида 123456:ABC-DEF...)")
    print()

    while True:
        token = input(bold("  Вставь токен бота: ")).strip()
        if ":" in token and len(token) > 20:
            return token
        print(red("  Не похоже на токен. Формат: 123456:ABC-DEF..."))
ask_telegram_id function · python · L96-L111 (16 LOC)
setup.py
def ask_telegram_id() -> str:
    """Спросить Telegram ID пользователя."""
    print()
    print(bold("Шаг 2: Твой Telegram ID"))
    print()
    print("  Как узнать свой ID:")
    print("  1. Открой Telegram, найди @userinfobot")
    print("  2. Отправь ему любое сообщение")
    print("  3. Он ответит твой ID (число)")
    print()

    while True:
        uid = input(bold("  Вставь свой Telegram ID: ")).strip()
        if uid.isdigit() and len(uid) >= 5:
            return uid
        print(red("  ID должен быть числом (минимум 5 цифр)"))
ask_deepgram_key function · python · L114-L132 (19 LOC)
setup.py
def ask_deepgram_key() -> str:
    """Спросить Deepgram API ключ (опционально)."""
    print()
    print(bold("Шаг 3: Deepgram API ключ (для голосовых сообщений)"))
    print()
    print("  Бот может распознавать голосовые через Deepgram API.")
    print("  Есть бесплатный тариф на $200 кредитов (~46 000 минут).")
    print()
    print("  Как получить:")
    print("  1. Зайди на https://console.deepgram.com/")
    print("  2. Зарегистрируйся и создай API Key")
    print()

    key = input(bold("  Вставь ключ (или Enter чтобы пропустить): ")).strip()
    if key:
        print(green("  Deepgram настроен — голосовые будут работать"))
    else:
        print(yellow("  Пропущено — голосовые можно настроить позже через .env"))
    return key
write_env function · python · L135-L146 (12 LOC)
setup.py
def write_env(token: str, user_id: str, deepgram_key: str = ""):
    """Записать .env файл."""
    content = f"""# Agent Fleet Platform
ME_BOT_TOKEN={token}
FOUNDER_TELEGRAM_ID={user_id}
"""
    if deepgram_key:
        content += f"\n# Deepgram API для голосовых сообщений\nDEEPGRAM_API_KEY={deepgram_key}\n"
    ENV_FILE.write_text(content)
    ENV_FILE.chmod(0o600)  # Только владелец может читать (секреты)
    print()
    print(green("  .env файл создан (права: 600)"))
Repobility analyzer · published findings · https://repobility.com
print_ready function · python · L149-L160 (12 LOC)
setup.py
def print_ready(user_id: str):
    """Финальное сообщение."""
    print()
    print(bold("=" * 50))
    print(green(bold("  Всё готово!")))
    print(bold("=" * 50))
    print()
    print("  Сейчас бот запустится.")
    print("  Открой Telegram и напиши боту любое сообщение.")
    print()
    print(yellow("  Остановить: Ctrl+C"))
    print()
main function · python · L163-L196 (34 LOC)
setup.py
def main():
    os.chdir(ROOT)

    print_header()

    # Проверки
    print(bold("Проверяю окружение..."))
    if not check_claude_cli():
        sys.exit(1)
    if not check_dependencies():
        sys.exit(1)

    # Если .env уже есть — спросить перезаписать
    if ENV_FILE.exists():
        print()
        print(yellow(f"  .env уже существует"))
        answer = input("  Перенастроить? (y/n): ").strip().lower()
        if answer not in ("y", "yes", "д", "да"):
            print()
            print("  Запускаю с текущими настройками...")
            print()
            os.execv(sys.executable, [sys.executable, "-m", "src.main"])
            return

    # Интерактивная настройка
    token = ask_bot_token()
    user_id = ask_telegram_id()
    deepgram_key = ask_deepgram_key()
    write_env(token, user_id, deepgram_key)

    print_ready(user_id)

    # Запуск бота
    os.execv(sys.executable, [sys.executable, "-m", "src.main"])
AgentManager class · python · L107-L371 (265 LOC)
src/agent_manager.py
class AgentManager:
    """CRUD-менеджер агентов."""

    def __init__(self, root: Path):
        self.root = root
        self.agents_dir = root / "agents"
        self.env_file = root / ".env"

    def create_agent(
        self,
        name: str,
        display_name: str,
        bot_token: str,
        description: str,
        model: str = "sonnet",
        soul_md: str | None = None,
    ) -> Path:
        """
        Создать всю структуру агента + записать токен в .env.

        Args:
            name: имя агента (латиница, для папки)
            display_name: отображаемое имя (русский)
            bot_token: токен от @BotFather
            description: описание роли (одно предложение)
            model: модель Claude (haiku/sonnet/opus)
            soul_md: кастомный SOUL.md (если None — используется шаблон)

        Returns:
            Path к созданной директории агента

        Raises:
            ValueError: если параметры невалидны
            FileExistsError: если агент
__init__ method · python · L110-L113 (4 LOC)
src/agent_manager.py
    def __init__(self, root: Path):
        self.root = root
        self.agents_dir = root / "agents"
        self.env_file = root / ".env"
create_agent method · python · L115-L187 (73 LOC)
src/agent_manager.py
    def create_agent(
        self,
        name: str,
        display_name: str,
        bot_token: str,
        description: str,
        model: str = "sonnet",
        soul_md: str | None = None,
    ) -> Path:
        """
        Создать всю структуру агента + записать токен в .env.

        Args:
            name: имя агента (латиница, для папки)
            display_name: отображаемое имя (русский)
            bot_token: токен от @BotFather
            description: описание роли (одно предложение)
            model: модель Claude (haiku/sonnet/opus)
            soul_md: кастомный SOUL.md (если None — используется шаблон)

        Returns:
            Path к созданной директории агента

        Raises:
            ValueError: если параметры невалидны
            FileExistsError: если агент с таким именем уже существует
        """
        # Валидация
        errors = self._validate_create_params(name, bot_token, model)
        if errors:
            raise ValueError("; ".join(error
list_agents method · python · L189-L234 (46 LOC)
src/agent_manager.py
    def list_agents(self) -> list[dict]:
        """
        Вернуть список агентов.

        Returns:
            Список словарей: name, display_name, model, token_set (bool)
        """
        if not self.agents_dir.exists():
            return []

        # Прочитать .env для проверки токенов
        env_vars = self._read_env_vars()

        agents = []
        for agent_yaml in sorted(self.agents_dir.glob("*/agent.yaml")):
            try:
                with open(agent_yaml, encoding="utf-8") as f:
                    raw = f.read()
                config = yaml.safe_load(raw)

                name = config.get("name", agent_yaml.parent.name)
                display_name = config.get("display_name", name)
                model = config.get("claude_model", "sonnet")

                # Определить имя env-переменной из bot_token
                token_ref = config.get("bot_token", "")
                token_set = False
                if isinstance(token_ref, str):
                  
validate_agent method · python · L236-L290 (55 LOC)
src/agent_manager.py
    def validate_agent(self, agent_dir: Path) -> tuple[bool, list[str]]:
        """
        Проверить agent.yaml и SOUL.md.

        Returns:
            (ok, список ошибок)
        """
        errors = []
        yaml_path = agent_dir / "agent.yaml"
        soul_path = agent_dir / "SOUL.md"

        # agent.yaml существует
        if not yaml_path.exists():
            errors.append("agent.yaml не найден")
            return False, errors

        # Парсинг YAML
        try:
            with open(yaml_path, encoding="utf-8") as f:
                raw = f.read()
            config = yaml.safe_load(raw)
        except yaml.YAMLError as e:
            errors.append(f"Ошибка парсинга YAML: {e}")
            return False, errors

        if not isinstance(config, dict):
            errors.append("agent.yaml должен быть словарём")
            return False, errors

        # Обязательные поля
        for field in ("name", "bot_token"):
            if field not in config:
                err
validate_all method · python · L292-L302 (11 LOC)
src/agent_manager.py
    def validate_all(self) -> dict[str, tuple[bool, list[str]]]:
        """Проверить всех агентов."""
        results = {}
        if not self.agents_dir.exists():
            return results

        for agent_dir in sorted(self.agents_dir.iterdir()):
            if agent_dir.is_dir() and (agent_dir / "agent.yaml").exists():
                results[agent_dir.name] = self.validate_agent(agent_dir)

        return results
Same scanner, your repo: https://repobility.com — Repobility
_validate_create_params method · python · L304-L329 (26 LOC)
src/agent_manager.py
    def _validate_create_params(
        self, name: str, bot_token: str, model: str
    ) -> list[str]:
        """Валидация параметров для create_agent."""
        errors = []

        if not name:
            errors.append("Имя агента не может быть пустым")
        elif not AGENT_NAME_RE.match(name):
            errors.append(
                "Имя агента: только латиница, цифры, дефис, подчёркивание. "
                "Начинается с буквы"
            )

        if not bot_token:
            errors.append("Токен бота не может быть пустым")
        elif not BOT_TOKEN_RE.match(bot_token):
            errors.append(
                "Невалидный токен бота. Формат: цифры:буквы "
                "(получить у @BotFather в Telegram)"
            )

        if model not in ("haiku", "sonnet", "opus"):
            errors.append(f"Неизвестная модель: {model}. Доступны: haiku, sonnet, opus")

        return errors
_read_env_vars method · python · L331-L343 (13 LOC)
src/agent_manager.py
    def _read_env_vars(self) -> dict[str, str]:
        """Прочитать .env файл в словарь."""
        env = {}
        if not self.env_file.exists():
            return env
        for line in self.env_file.read_text(encoding="utf-8").splitlines():
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            if "=" in line:
                key, _, value = line.partition("=")
                env[key.strip()] = value.strip()
        return env
_add_env_var method · python · L345-L371 (27 LOC)
src/agent_manager.py
    def _add_env_var(self, var_name: str, value: str) -> None:
        """Добавить переменную в .env файл."""
        lines = []
        if self.env_file.exists():
            lines = self.env_file.read_text(encoding="utf-8").splitlines()

        # Проверить, не существует ли уже
        for i, line in enumerate(lines):
            if line.strip().startswith(f"{var_name}="):
                lines[i] = f"{var_name}={value}"
                self.env_file.write_text(
                    "\n".join(lines) + "\n", encoding="utf-8"
                )
                return

        # Добавить новую переменную
        # Обеспечить пустую строку перед новой записью
        if lines and lines[-1].strip():
            lines.append("")
        lines.append(f"{var_name}={value}")

        self.env_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
        self.env_file.chmod(0o600)  # Только владелец может читать
        logger.info(f"Добавлен {var_name} в .env")

        # Сразу установить
Agent class · python · L37-L428 (392 LOC)
src/agent.py
class Agent:
    def __init__(self, config_path: str):
        """
        Инициализация агента из YAML конфига.

        Args:
            config_path: путь к agent.yaml
        """
        self.config_path = Path(config_path)
        self.agent_dir = str(self.config_path.parent)
        self.config = self._load_config()

        self.name: str = self.config["name"]
        self.display_name: str = self.config.get("display_name", self.name)
        self.bot_token: str = self.config["bot_token"]
        self.system_prompt_template: str = self.config.get("system_prompt", "")
        self.memory_path: str = self.config.get("memory_path", f"./agents/{self.name}/memory/")
        self.skill_names: list[str] = self.config.get("skills", [])
        self.allowed_users: list[int] = self._parse_allowed_users()
        self.max_context_messages: int = self.config.get("max_context_messages", 50)
        self.claude_model: str = self.config.get("claude_model", "sonnet")
        self.claude_flags: 
__init__ method · python · L38-L70 (33 LOC)
src/agent.py
    def __init__(self, config_path: str):
        """
        Инициализация агента из YAML конфига.

        Args:
            config_path: путь к agent.yaml
        """
        self.config_path = Path(config_path)
        self.agent_dir = str(self.config_path.parent)
        self.config = self._load_config()

        self.name: str = self.config["name"]
        self.display_name: str = self.config.get("display_name", self.name)
        self.bot_token: str = self.config["bot_token"]
        self.system_prompt_template: str = self.config.get("system_prompt", "")
        self.memory_path: str = self.config.get("memory_path", f"./agents/{self.name}/memory/")
        self.skill_names: list[str] = self.config.get("skills", [])
        self.allowed_users: list[int] = self._parse_allowed_users()
        self.max_context_messages: int = self.config.get("max_context_messages", 50)
        self.claude_model: str = self.config.get("claude_model", "sonnet")
        self.claude_flags: list[str] = s
_load_config method · python · L72-L79 (8 LOC)
src/agent.py
    def _load_config(self) -> dict:
        """Загрузить и обработать YAML конфиг с expandvars."""
        with open(self.config_path, encoding="utf-8") as f:
            raw = f.read()

        # expandvars для секретов (${ME_BOT_TOKEN} → значение из .env)
        expanded = os.path.expandvars(raw)
        return yaml.safe_load(expanded)
_parse_allowed_users method · python · L81-L90 (10 LOC)
src/agent.py
    def _parse_allowed_users(self) -> list[int]:
        """Разобрать список allowed_users, преобразовать в int."""
        raw = self.config.get("allowed_users", [])
        result = []
        for u in raw:
            try:
                result.append(int(u))
            except (ValueError, TypeError):
                logger.warning(f"Невалидный user ID: {u}")
        return result
is_user_allowed method · python · L92-L96 (5 LOC)
src/agent.py
    def is_user_allowed(self, user_id: int) -> bool:
        """Проверить, имеет ли пользователь доступ."""
        if not self.allowed_users:
            return True  # Если список пуст — доступ всем
        return user_id in self.allowed_users
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
parse_skill_frontmatter method · python · L99-L114 (16 LOC)
src/agent.py
    def parse_skill_frontmatter(text: str) -> tuple[dict | None, str]:
        """
        Разобрать YAML frontmatter из skill файла.

        Returns:
            (metadata dict или None, тело скилла без frontmatter)
        """
        match = re.match(r"^---\s*\n(.*?)\n---\s*\n", text, re.DOTALL)
        if not match:
            return None, text
        try:
            meta = yaml.safe_load(match.group(1))
        except yaml.YAMLError:
            return None, text
        body = text[match.end():]
        return meta, body
check_skill_requirements method · python · L117-L132 (16 LOC)
src/agent.py
    def check_skill_requirements(meta: dict) -> tuple[bool, list[str]]:
        """
        Проверить зависимости скилла.

        Returns:
            (ok, список ошибок)
        """
        errors = []
        reqs = meta.get("requirements", {})
        for cmd in reqs.get("commands", []):
            if not shutil.which(cmd):
                errors.append(f"команда '{cmd}' не найдена")
        for env_var in reqs.get("env", []):
            if not os.environ.get(env_var):
                errors.append(f"переменная '{env_var}' не задана")
        return len(errors) == 0, errors
_load_skills method · python · L134-L173 (40 LOC)
src/agent.py
    def _load_skills(self) -> str:
        """
        Загрузить скиллы из agents/{name}/skills/*.md.

        Поддерживает YAML frontmatter:
        - description: описание скилла
        - requirements.commands: проверка через shutil.which()
        - requirements.env: проверка через os.environ
        - always: true = всегда в system prompt (иначе по имени из agent.yaml)
        """
        skills_dir = Path(self.agent_dir) / "skills"
        if not skills_dir.exists():
            return ""

        parts = []
        for skill_file in sorted(skills_dir.glob("*.md")):
            raw = skill_file.read_text(encoding="utf-8")
            meta, body = self.parse_skill_frontmatter(raw)

            skill_name = skill_file.stem

            # Фильтрация: если не always и не в списке skills из yaml — пропустить
            if meta and not meta.get("always", False):
                if self.skill_names and skill_name not in self.skill_names:
                    continue

            # Пров
_load_soul method · python · L175-L180 (6 LOC)
src/agent.py
    def _load_soul(self) -> str:
        """Загрузить SOUL.md из директории агента."""
        soul_path = Path(self.agent_dir) / "SOUL.md"
        if soul_path.exists():
            return soul_path.read_text(encoding="utf-8")
        return ""
build_system_prompt method · python · L182-L213 (32 LOC)
src/agent.py
    def build_system_prompt(self) -> str:
        """
        Собрать полный system prompt:
        SOUL.md + system_prompt из YAML + skills + memory context
        """
        parts = []

        # 1. SOUL.md — личность агента
        soul = self._load_soul()
        if soul:
            parts.append(soul)

        # 2. System prompt из agent.yaml
        if self.system_prompt_template:
            parts.append(self.system_prompt_template)

        # 3. Скиллы
        skills = self._load_skills()
        if skills:
            parts.append("## Скиллы\n\n" + skills)

        # 4. Контекст из памяти (profile.md, index.md, daily note)
        ctx = memory.read_context(self.agent_dir)
        if ctx:
            parts.append("## Контекст из памяти\n\n" + ctx)

        # 5. Языковая инструкция
        lang = memory.get_setting(self.agent_dir, "language")
        if lang:
            parts.append(t("system_lang_instruction", lang))

        return "\n\n---\n\n".join(parts)
build_group_system_prompt method · python · L215-L258 (44 LOC)
src/agent.py
    def build_group_system_prompt(self, chat_id: int) -> str:
        """
        Собрать system prompt для группового чата.

        Отличия от build_system_prompt:
        - Вместо profile.md → groups/{chat_id}/context.md
        - Вместо личного daily → groups/{chat_id}/daily/
        - Добавляет инструкцию для группового режима
        - НЕ включает личную wiki/profile владельца
        """
        parts = []

        # 1. SOUL.md — личность (общая)
        soul = self._load_soul()
        if soul:
            parts.append(soul)

        # 2. System prompt из agent.yaml (общий)
        if self.system_prompt_template:
            parts.append(self.system_prompt_template)

        # 3. Инструкция для группового режима
        group_instructions = (
            "## Режим группового чата\n\n"
            "Ты в групповом чате. Правила:\n"
            "- Отвечай только когда к тебе обращаются\n"
            "- Учитывай контекст предыдущих сообщений (они в логе ниже)\n"
            "- Обр
_parse_allowed_tools method · python · L260-L266 (7 LOC)
src/agent.py
    def _parse_allowed_tools(self) -> list[str] | None:
        """Извлечь allowed tools из claude_flags."""
        flags = self.claude_flags
        for i, flag in enumerate(flags):
            if flag == "--allowedTools" and i + 1 < len(flags):
                return flags[i + 1].split(",")
        return None
call_claude method · python · L268-L428 (161 LOC)
src/agent.py
    async def call_claude(
        self,
        message: str,
        files: list[str] | None = None,
        semaphore: asyncio.Semaphore | None = None,
        on_tool_use: Callable[[str], Awaitable[None]] | None = None,
        on_text_delta: Callable[[str], Awaitable[None]] | None = None,
        group_chat_id: int | None = None,
    ) -> str:
        """
        Вызвать Claude через claude-agent-sdk.

        Args:
            message: текст сообщения от пользователя
            files: список путей к файлам (будут упомянуты в промпте)
            semaphore: глобальный семафор для ограничения параллельных вызовов
            on_tool_use: async callback, вызывается с tool hint строкой
                         при каждом использовании инструмента
            on_text_delta: async callback, вызывается при каждом TextBlock
                           для streaming ответа
            group_chat_id: ID группового чата (None для DM)

        Returns:
            Текстовый ответ от Claude
 
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
AgentWorker class · python · L20-L157 (138 LOC)
src/agent_worker.py
class AgentWorker:
    """
    Воркер агента: читает из bus → call_claude → публикует ответ.

    Одновременно обрабатывает по одному сообщению на chat_id
    (сериализация через _active набор).
    """

    def __init__(
        self,
        agent: "Agent",
        bus: FleetBus,
        semaphore: asyncio.Semaphore,
    ):
        self.agent = agent
        self.bus = bus
        self.semaphore = semaphore
        self._queue_name = f"agent:{agent.name}"
        self._running = False
        # Активные chat_id → Task (для /stop)
        self._active_tasks: dict[int, asyncio.Task] = {}

    def cancel_task(self, chat_id: int) -> bool:
        """Отменить активную задачу для chat_id (для /stop)."""
        task = self._active_tasks.get(chat_id)
        if task and not task.done():
            task.cancel()
            self._active_tasks.pop(chat_id, None)
            return True
        return False

    async def run(self) -> None:
        """Основной цикл: читать из bus, обрабатыват
__init__ method · python · L28-L40 (13 LOC)
src/agent_worker.py
    def __init__(
        self,
        agent: "Agent",
        bus: FleetBus,
        semaphore: asyncio.Semaphore,
    ):
        self.agent = agent
        self.bus = bus
        self.semaphore = semaphore
        self._queue_name = f"agent:{agent.name}"
        self._running = False
        # Активные chat_id → Task (для /stop)
        self._active_tasks: dict[int, asyncio.Task] = {}
cancel_task method · python · L42-L49 (8 LOC)
src/agent_worker.py
    def cancel_task(self, chat_id: int) -> bool:
        """Отменить активную задачу для chat_id (для /stop)."""
        task = self._active_tasks.get(chat_id)
        if task and not task.done():
            task.cancel()
            self._active_tasks.pop(chat_id, None)
            return True
        return False
run method · python · L51-L69 (19 LOC)
src/agent_worker.py
    async def run(self) -> None:
        """Основной цикл: читать из bus, обрабатывать, отвечать."""
        self._running = True
        logger.info(f"AgentWorker '{self.agent.name}' запущен")

        while self._running:
            try:
                msg = await self.bus.consume(self._queue_name)
                # Обработать в отдельной задаче (не блокируем очередь)
                task = asyncio.create_task(self._handle_message(msg))
                if msg.chat_id:
                    self._active_tasks[msg.chat_id] = task
            except asyncio.CancelledError:
                logger.info(f"AgentWorker '{self.agent.name}' остановлен")
                break
            except Exception as e:
                logger.error(f"AgentWorker '{self.agent.name}' error: {e}")

        self._running = False
_handle_message method · python · L71-L153 (83 LOC)
src/agent_worker.py
    async def _handle_message(self, msg: FleetMessage) -> None:
        """Обработать одно входящее сообщение."""
        chat_id = msg.chat_id
        # Пробросить thread_id через все ответные сообщения
        thread_id = msg.metadata.get("message_thread_id")
        base_meta = {"message_thread_id": thread_id} if thread_id else {}

        # Уведомить bridge что начали обработку
        await self.bus.publish(FleetMessage(
            source=f"agent:{self.agent.name}",
            target=f"telegram:{self.agent.name}",
            content="",
            msg_type=MessageType.SYSTEM,
            chat_id=chat_id,
            metadata={**base_meta, "event": "processing_started"},
        ))

        try:
            # Колбек для tool hints — пересылаем через bus
            async def on_tool_use(hint: str):
                await self.bus.publish(FleetMessage(
                    source=f"agent:{self.agent.name}",
                    target=f"telegram:{self.agent.name}",
                
stop method · python · L155-L157 (3 LOC)
src/agent_worker.py
    def stop(self) -> None:
        """Остановить воркер."""
        self._running = False
MessageType class · python · L21-L26 (6 LOC)
src/bus.py
class MessageType(Enum):
    """Типы сообщений в шине."""
    INBOUND = "inbound"          # Входящее от пользователя
    OUTBOUND = "outbound"        # Ответ агента → канал
    AGENT_TO_AGENT = "a2a"       # Между агентами
    SYSTEM = "system"            # Системные (heartbeat, dream, etc.)
FleetMessage class · python · L30-L42 (13 LOC)
src/bus.py
class FleetMessage:
    """Единый формат сообщения в шине."""
    source: str            # "telegram", "agent:me", "system"
    target: str            # "orchestrator", "agent:coder", "telegram:{chat_id}"
    content: str
    msg_type: MessageType = MessageType.INBOUND
    session_id: str = ""
    chat_id: int = 0
    user_id: int = 0
    files: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)
    id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
    timestamp: float = field(default_factory=time.time)
Repobility analyzer · published findings · https://repobility.com
FleetBus class · python · L45-L148 (104 LOC)
src/bus.py
class FleetBus:
    """
    Pub/sub шина сообщений на asyncio.Queue.

    Подписчики регистрируются по имени (например "orchestrator", "agent:me").
    Сообщения доставляются по полю target:
    - Точный адрес: "agent:me" → очередь "agent:me"
    - Broadcast: "*" → все подписчики
    - Prefix: "agent:*" → все подписчики с префиксом "agent:"
    """

    def __init__(self):
        self._queues: dict[str, asyncio.Queue] = {}
        self._running = False

    def subscribe(self, name: str, maxsize: int = 100) -> asyncio.Queue:
        """
        Зарегистрировать подписчика.

        Args:
            name: уникальное имя ("orchestrator", "agent:me", "telegram")
            maxsize: размер очереди

        Returns:
            asyncio.Queue для чтения сообщений
        """
        if name in self._queues:
            return self._queues[name]
        q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
        self._queues[name] = q
        logger.info(f"Bus: подписчик '{name}' зарегистрир
__init__ method · python · L56-L58 (3 LOC)
src/bus.py
    def __init__(self):
        self._queues: dict[str, asyncio.Queue] = {}
        self._running = False
subscribe method · python · L60-L76 (17 LOC)
src/bus.py
    def subscribe(self, name: str, maxsize: int = 100) -> asyncio.Queue:
        """
        Зарегистрировать подписчика.

        Args:
            name: уникальное имя ("orchestrator", "agent:me", "telegram")
            maxsize: размер очереди

        Returns:
            asyncio.Queue для чтения сообщений
        """
        if name in self._queues:
            return self._queues[name]
        q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
        self._queues[name] = q
        logger.info(f"Bus: подписчик '{name}' зарегистрирован")
        return q
unsubscribe method · python · L78-L81 (4 LOC)
src/bus.py
    def unsubscribe(self, name: str) -> None:
        """Удалить подписчика."""
        self._queues.pop(name, None)
        logger.info(f"Bus: подписчик '{name}' удалён")
publish method · python · L83-L115 (33 LOC)
src/bus.py
    async def publish(self, msg: FleetMessage) -> int:
        """
        Опубликовать сообщение в шину.

        Маршрутизация по msg.target:
        - "*" → все подписчики
        - "agent:*" → все agent:*
        - "agent:me" → точно agent:me

        Returns:
            Количество получателей
        """
        delivered = 0
        target = msg.target

        for name, queue in self._queues.items():
            if self._matches(name, target):
                try:
                    queue.put_nowait(msg)
                    delivered += 1
                except asyncio.QueueFull:
                    logger.warning(
                        f"Bus: очередь '{name}' переполнена, "
                        f"сообщение от '{msg.source}' потеряно"
                    )

        if delivered == 0:
            logger.warning(
                f"Bus: нет получателей для target='{target}' "
                f"(source='{msg.source}')"
            )

        return delivered
consume method · python · L117-L133 (17 LOC)
src/bus.py
    async def consume(self, name: str) -> FleetMessage:
        """
        Прочитать одно сообщение из очереди подписчика.

        Args:
            name: имя подписчика

        Returns:
            FleetMessage

        Raises:
            KeyError: если подписчик не зарегистрирован
        """
        queue = self._queues.get(name)
        if queue is None:
            raise KeyError(f"Подписчик '{name}' не зарегистрирован")
        return await queue.get()
_matches method · python · L141-L148 (8 LOC)
src/bus.py
    def _matches(subscriber: str, target: str) -> bool:
        """Проверить, подходит ли подписчик под target."""
        if target == "*":
            return True
        if target.endswith(":*"):
            prefix = target[:-1]  # "agent:*" → "agent:"
            return subscriber.startswith(prefix)
        return subscriber == target
find_root function · python · L17-L23 (7 LOC)
src/cli.py
def find_root() -> Path:
    """Найти корень проекта."""
    current = Path.cwd()
    for parent in [current] + list(current.parents):
        if (parent / "agents").is_dir():
            return parent
    return Path(__file__).parent.parent
Same scanner, your repo: https://repobility.com — Repobility
cmd_list_agents function · python · L26-L46 (21 LOC)
src/cli.py
def cmd_list_agents(manager: AgentManager) -> None:
    """Таблица всех агентов."""
    agents = manager.list_agents()

    if not agents:
        print("Агенты не найдены.")
        return

    # Форматирование таблицы
    header = f"{'Имя':<15} {'Название':<25} {'Модель':<10} {'Токен':<10}"
    print(header)
    print("-" * len(header))

    for a in agents:
        token_status = "✓ задан" if a["token_set"] else "✗ нет"
        print(
            f"{a['name']:<15} {a['display_name']:<25} "
            f"{a['model']:<10} {token_status:<10}"
        )

    print(f"\nВсего: {len(agents)} агентов")
cmd_validate function · python · L49-L70 (22 LOC)
src/cli.py
def cmd_validate(manager: AgentManager) -> None:
    """Валидация всех агентов."""
    results = manager.validate_all()

    if not results:
        print("Агенты не найдены.")
        return

    all_ok = True
    for name, (ok, errors) in results.items():
        if ok:
            print(f"  ✓ {name}")
        else:
            all_ok = False
            print(f"  ✗ {name}:")
            for err in errors:
                print(f"      - {err}")

    if all_ok:
        print(f"\nВсе {len(results)} агентов валидны.")
    else:
        print("\nЕсть ошибки. Исправь и запусти снова.")
cmd_create_agent function · python · L73-L138 (66 LOC)
src/cli.py
def cmd_create_agent(manager: AgentManager) -> None:
    """Интерактивный визард создания агента."""
    print("=== Создание нового агента ===\n")

    # 1. Имя
    while True:
        name = input("Имя агента (латиницей, для папки): ").strip().lower()
        if not name:
            print("  Имя не может быть пустым.")
            continue
        if (manager.agents_dir / name).exists():
            print(f"  Агент '{name}' уже существует.")
            continue
        break

    # 2. Отображаемое имя
    display_name = input("Отображаемое имя (на русском): ").strip()
    if not display_name:
        display_name = name.title()

    # 3. Токен бота
    while True:
        bot_token = input("Токен бота (от @BotFather): ").strip()
        if not bot_token:
            print("  Токен не может быть пустым.")
            continue
        break

    # 4. Описание роли
    description = input("Описание роли (одно предложение): ").strip()
    if not description:
        description = f"AI-а
page 1 / 4next ›