Function bodies 185 total
print_header function · python · L36-L41 (6 LOC)setup.py
def print_header():
print()
print(bold("=" * 50))
print(bold(" Agent Fleet Platform — Настройка"))
print(bold("=" * 50))
print()check_dependencies function · python · L44-L63 (20 LOC)setup.py
def check_dependencies():
"""Проверить и установить зависимости."""
try:
import yaml
import dotenv
import telegram
import claude_agent_sdk
print(green(" Зависимости установлены"))
return True
except ImportError:
print(yellow(" Устанавливаю зависимости..."))
result = subprocess.run(
[sys.executable, "-m", "pip", "install", "-r", str(ROOT / "requirements.txt")],
capture_output=True, text=True
)
if result.returncode != 0:
print(red(f" Ошибка установки: {result.stderr[:200]}"))
return False
print(green(" Зависимости установлены"))
return Truecheck_claude_cli function · python · L66-L74 (9 LOC)setup.py
def check_claude_cli():
"""Проверить что Claude CLI доступен."""
result = subprocess.run(["which", "claude"], capture_output=True, text=True)
if result.returncode != 0:
print(red(" Claude CLI не найден!"))
print(" Установи: https://docs.anthropic.com/en/docs/claude-code")
return False
print(green(" Claude CLI найден"))
return Trueask_bot_token function · python · L77-L93 (17 LOC)setup.py
def ask_bot_token() -> str:
"""Спросить токен бота."""
print()
print(bold("Шаг 1: Токен Telegram-бота"))
print()
print(" Если у тебя ещё нет бота:")
print(" 1. Открой Telegram, найди @BotFather")
print(" 2. Отправь /newbot")
print(" 3. Придумай имя и username")
print(" 4. Скопируй токен (вида 123456:ABC-DEF...)")
print()
while True:
token = input(bold(" Вставь токен бота: ")).strip()
if ":" in token and len(token) > 20:
return token
print(red(" Не похоже на токен. Формат: 123456:ABC-DEF..."))ask_telegram_id function · python · L96-L111 (16 LOC)setup.py
def ask_telegram_id() -> str:
"""Спросить Telegram ID пользователя."""
print()
print(bold("Шаг 2: Твой Telegram ID"))
print()
print(" Как узнать свой ID:")
print(" 1. Открой Telegram, найди @userinfobot")
print(" 2. Отправь ему любое сообщение")
print(" 3. Он ответит твой ID (число)")
print()
while True:
uid = input(bold(" Вставь свой Telegram ID: ")).strip()
if uid.isdigit() and len(uid) >= 5:
return uid
print(red(" ID должен быть числом (минимум 5 цифр)"))ask_deepgram_key function · python · L114-L132 (19 LOC)setup.py
def ask_deepgram_key() -> str:
"""Спросить Deepgram API ключ (опционально)."""
print()
print(bold("Шаг 3: Deepgram API ключ (для голосовых сообщений)"))
print()
print(" Бот может распознавать голосовые через Deepgram API.")
print(" Есть бесплатный тариф на $200 кредитов (~46 000 минут).")
print()
print(" Как получить:")
print(" 1. Зайди на https://console.deepgram.com/")
print(" 2. Зарегистрируйся и создай API Key")
print()
key = input(bold(" Вставь ключ (или Enter чтобы пропустить): ")).strip()
if key:
print(green(" Deepgram настроен — голосовые будут работать"))
else:
print(yellow(" Пропущено — голосовые можно настроить позже через .env"))
return keywrite_env function · python · L135-L146 (12 LOC)setup.py
def write_env(token: str, user_id: str, deepgram_key: str = ""):
"""Записать .env файл."""
content = f"""# Agent Fleet Platform
ME_BOT_TOKEN={token}
FOUNDER_TELEGRAM_ID={user_id}
"""
if deepgram_key:
content += f"\n# Deepgram API для голосовых сообщений\nDEEPGRAM_API_KEY={deepgram_key}\n"
ENV_FILE.write_text(content)
ENV_FILE.chmod(0o600) # Только владелец может читать (секреты)
print()
print(green(" .env файл создан (права: 600)"))Repobility analyzer · published findings · https://repobility.com
print_ready function · python · L149-L160 (12 LOC)setup.py
def print_ready(user_id: str):
"""Финальное сообщение."""
print()
print(bold("=" * 50))
print(green(bold(" Всё готово!")))
print(bold("=" * 50))
print()
print(" Сейчас бот запустится.")
print(" Открой Telegram и напиши боту любое сообщение.")
print()
print(yellow(" Остановить: Ctrl+C"))
print()main function · python · L163-L196 (34 LOC)setup.py
def main():
os.chdir(ROOT)
print_header()
# Проверки
print(bold("Проверяю окружение..."))
if not check_claude_cli():
sys.exit(1)
if not check_dependencies():
sys.exit(1)
# Если .env уже есть — спросить перезаписать
if ENV_FILE.exists():
print()
print(yellow(f" .env уже существует"))
answer = input(" Перенастроить? (y/n): ").strip().lower()
if answer not in ("y", "yes", "д", "да"):
print()
print(" Запускаю с текущими настройками...")
print()
os.execv(sys.executable, [sys.executable, "-m", "src.main"])
return
# Интерактивная настройка
token = ask_bot_token()
user_id = ask_telegram_id()
deepgram_key = ask_deepgram_key()
write_env(token, user_id, deepgram_key)
print_ready(user_id)
# Запуск бота
os.execv(sys.executable, [sys.executable, "-m", "src.main"])AgentManager class · python · L107-L371 (265 LOC)src/agent_manager.py
class AgentManager:
"""CRUD-менеджер агентов."""
def __init__(self, root: Path):
self.root = root
self.agents_dir = root / "agents"
self.env_file = root / ".env"
def create_agent(
self,
name: str,
display_name: str,
bot_token: str,
description: str,
model: str = "sonnet",
soul_md: str | None = None,
) -> Path:
"""
Создать всю структуру агента + записать токен в .env.
Args:
name: имя агента (латиница, для папки)
display_name: отображаемое имя (русский)
bot_token: токен от @BotFather
description: описание роли (одно предложение)
model: модель Claude (haiku/sonnet/opus)
soul_md: кастомный SOUL.md (если None — используется шаблон)
Returns:
Path к созданной директории агента
Raises:
ValueError: если параметры невалидны
FileExistsError: если агент__init__ method · python · L110-L113 (4 LOC)src/agent_manager.py
def __init__(self, root: Path):
self.root = root
self.agents_dir = root / "agents"
self.env_file = root / ".env"create_agent method · python · L115-L187 (73 LOC)src/agent_manager.py
def create_agent(
self,
name: str,
display_name: str,
bot_token: str,
description: str,
model: str = "sonnet",
soul_md: str | None = None,
) -> Path:
"""
Создать всю структуру агента + записать токен в .env.
Args:
name: имя агента (латиница, для папки)
display_name: отображаемое имя (русский)
bot_token: токен от @BotFather
description: описание роли (одно предложение)
model: модель Claude (haiku/sonnet/opus)
soul_md: кастомный SOUL.md (если None — используется шаблон)
Returns:
Path к созданной директории агента
Raises:
ValueError: если параметры невалидны
FileExistsError: если агент с таким именем уже существует
"""
# Валидация
errors = self._validate_create_params(name, bot_token, model)
if errors:
raise ValueError("; ".join(errorlist_agents method · python · L189-L234 (46 LOC)src/agent_manager.py
def list_agents(self) -> list[dict]:
"""
Вернуть список агентов.
Returns:
Список словарей: name, display_name, model, token_set (bool)
"""
if not self.agents_dir.exists():
return []
# Прочитать .env для проверки токенов
env_vars = self._read_env_vars()
agents = []
for agent_yaml in sorted(self.agents_dir.glob("*/agent.yaml")):
try:
with open(agent_yaml, encoding="utf-8") as f:
raw = f.read()
config = yaml.safe_load(raw)
name = config.get("name", agent_yaml.parent.name)
display_name = config.get("display_name", name)
model = config.get("claude_model", "sonnet")
# Определить имя env-переменной из bot_token
token_ref = config.get("bot_token", "")
token_set = False
if isinstance(token_ref, str):
validate_agent method · python · L236-L290 (55 LOC)src/agent_manager.py
def validate_agent(self, agent_dir: Path) -> tuple[bool, list[str]]:
"""
Проверить agent.yaml и SOUL.md.
Returns:
(ok, список ошибок)
"""
errors = []
yaml_path = agent_dir / "agent.yaml"
soul_path = agent_dir / "SOUL.md"
# agent.yaml существует
if not yaml_path.exists():
errors.append("agent.yaml не найден")
return False, errors
# Парсинг YAML
try:
with open(yaml_path, encoding="utf-8") as f:
raw = f.read()
config = yaml.safe_load(raw)
except yaml.YAMLError as e:
errors.append(f"Ошибка парсинга YAML: {e}")
return False, errors
if not isinstance(config, dict):
errors.append("agent.yaml должен быть словарём")
return False, errors
# Обязательные поля
for field in ("name", "bot_token"):
if field not in config:
errvalidate_all method · python · L292-L302 (11 LOC)src/agent_manager.py
def validate_all(self) -> dict[str, tuple[bool, list[str]]]:
"""Проверить всех агентов."""
results = {}
if not self.agents_dir.exists():
return results
for agent_dir in sorted(self.agents_dir.iterdir()):
if agent_dir.is_dir() and (agent_dir / "agent.yaml").exists():
results[agent_dir.name] = self.validate_agent(agent_dir)
return resultsSame scanner, your repo: https://repobility.com — Repobility
_validate_create_params method · python · L304-L329 (26 LOC)src/agent_manager.py
def _validate_create_params(
self, name: str, bot_token: str, model: str
) -> list[str]:
"""Валидация параметров для create_agent."""
errors = []
if not name:
errors.append("Имя агента не может быть пустым")
elif not AGENT_NAME_RE.match(name):
errors.append(
"Имя агента: только латиница, цифры, дефис, подчёркивание. "
"Начинается с буквы"
)
if not bot_token:
errors.append("Токен бота не может быть пустым")
elif not BOT_TOKEN_RE.match(bot_token):
errors.append(
"Невалидный токен бота. Формат: цифры:буквы "
"(получить у @BotFather в Telegram)"
)
if model not in ("haiku", "sonnet", "opus"):
errors.append(f"Неизвестная модель: {model}. Доступны: haiku, sonnet, opus")
return errors_read_env_vars method · python · L331-L343 (13 LOC)src/agent_manager.py
def _read_env_vars(self) -> dict[str, str]:
"""Прочитать .env файл в словарь."""
env = {}
if not self.env_file.exists():
return env
for line in self.env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" in line:
key, _, value = line.partition("=")
env[key.strip()] = value.strip()
return env_add_env_var method · python · L345-L371 (27 LOC)src/agent_manager.py
def _add_env_var(self, var_name: str, value: str) -> None:
"""Добавить переменную в .env файл."""
lines = []
if self.env_file.exists():
lines = self.env_file.read_text(encoding="utf-8").splitlines()
# Проверить, не существует ли уже
for i, line in enumerate(lines):
if line.strip().startswith(f"{var_name}="):
lines[i] = f"{var_name}={value}"
self.env_file.write_text(
"\n".join(lines) + "\n", encoding="utf-8"
)
return
# Добавить новую переменную
# Обеспечить пустую строку перед новой записью
if lines and lines[-1].strip():
lines.append("")
lines.append(f"{var_name}={value}")
self.env_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
self.env_file.chmod(0o600) # Только владелец может читать
logger.info(f"Добавлен {var_name} в .env")
# Сразу установитьAgent class · python · L37-L428 (392 LOC)src/agent.py
class Agent:
def __init__(self, config_path: str):
"""
Инициализация агента из YAML конфига.
Args:
config_path: путь к agent.yaml
"""
self.config_path = Path(config_path)
self.agent_dir = str(self.config_path.parent)
self.config = self._load_config()
self.name: str = self.config["name"]
self.display_name: str = self.config.get("display_name", self.name)
self.bot_token: str = self.config["bot_token"]
self.system_prompt_template: str = self.config.get("system_prompt", "")
self.memory_path: str = self.config.get("memory_path", f"./agents/{self.name}/memory/")
self.skill_names: list[str] = self.config.get("skills", [])
self.allowed_users: list[int] = self._parse_allowed_users()
self.max_context_messages: int = self.config.get("max_context_messages", 50)
self.claude_model: str = self.config.get("claude_model", "sonnet")
self.claude_flags: __init__ method · python · L38-L70 (33 LOC)src/agent.py
def __init__(self, config_path: str):
"""
Инициализация агента из YAML конфига.
Args:
config_path: путь к agent.yaml
"""
self.config_path = Path(config_path)
self.agent_dir = str(self.config_path.parent)
self.config = self._load_config()
self.name: str = self.config["name"]
self.display_name: str = self.config.get("display_name", self.name)
self.bot_token: str = self.config["bot_token"]
self.system_prompt_template: str = self.config.get("system_prompt", "")
self.memory_path: str = self.config.get("memory_path", f"./agents/{self.name}/memory/")
self.skill_names: list[str] = self.config.get("skills", [])
self.allowed_users: list[int] = self._parse_allowed_users()
self.max_context_messages: int = self.config.get("max_context_messages", 50)
self.claude_model: str = self.config.get("claude_model", "sonnet")
self.claude_flags: list[str] = s_load_config method · python · L72-L79 (8 LOC)src/agent.py
def _load_config(self) -> dict:
"""Загрузить и обработать YAML конфиг с expandvars."""
with open(self.config_path, encoding="utf-8") as f:
raw = f.read()
# expandvars для секретов (${ME_BOT_TOKEN} → значение из .env)
expanded = os.path.expandvars(raw)
return yaml.safe_load(expanded)_parse_allowed_users method · python · L81-L90 (10 LOC)src/agent.py
def _parse_allowed_users(self) -> list[int]:
"""Разобрать список allowed_users, преобразовать в int."""
raw = self.config.get("allowed_users", [])
result = []
for u in raw:
try:
result.append(int(u))
except (ValueError, TypeError):
logger.warning(f"Невалидный user ID: {u}")
return resultis_user_allowed method · python · L92-L96 (5 LOC)src/agent.py
def is_user_allowed(self, user_id: int) -> bool:
"""Проверить, имеет ли пользователь доступ."""
if not self.allowed_users:
return True # Если список пуст — доступ всем
return user_id in self.allowed_usersWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
parse_skill_frontmatter method · python · L99-L114 (16 LOC)src/agent.py
def parse_skill_frontmatter(text: str) -> tuple[dict | None, str]:
"""
Разобрать YAML frontmatter из skill файла.
Returns:
(metadata dict или None, тело скилла без frontmatter)
"""
match = re.match(r"^---\s*\n(.*?)\n---\s*\n", text, re.DOTALL)
if not match:
return None, text
try:
meta = yaml.safe_load(match.group(1))
except yaml.YAMLError:
return None, text
body = text[match.end():]
return meta, bodycheck_skill_requirements method · python · L117-L132 (16 LOC)src/agent.py
def check_skill_requirements(meta: dict) -> tuple[bool, list[str]]:
"""
Проверить зависимости скилла.
Returns:
(ok, список ошибок)
"""
errors = []
reqs = meta.get("requirements", {})
for cmd in reqs.get("commands", []):
if not shutil.which(cmd):
errors.append(f"команда '{cmd}' не найдена")
for env_var in reqs.get("env", []):
if not os.environ.get(env_var):
errors.append(f"переменная '{env_var}' не задана")
return len(errors) == 0, errors_load_skills method · python · L134-L173 (40 LOC)src/agent.py
def _load_skills(self) -> str:
"""
Загрузить скиллы из agents/{name}/skills/*.md.
Поддерживает YAML frontmatter:
- description: описание скилла
- requirements.commands: проверка через shutil.which()
- requirements.env: проверка через os.environ
- always: true = всегда в system prompt (иначе по имени из agent.yaml)
"""
skills_dir = Path(self.agent_dir) / "skills"
if not skills_dir.exists():
return ""
parts = []
for skill_file in sorted(skills_dir.glob("*.md")):
raw = skill_file.read_text(encoding="utf-8")
meta, body = self.parse_skill_frontmatter(raw)
skill_name = skill_file.stem
# Фильтрация: если не always и не в списке skills из yaml — пропустить
if meta and not meta.get("always", False):
if self.skill_names and skill_name not in self.skill_names:
continue
# Пров_load_soul method · python · L175-L180 (6 LOC)src/agent.py
def _load_soul(self) -> str:
"""Загрузить SOUL.md из директории агента."""
soul_path = Path(self.agent_dir) / "SOUL.md"
if soul_path.exists():
return soul_path.read_text(encoding="utf-8")
return ""build_system_prompt method · python · L182-L213 (32 LOC)src/agent.py
def build_system_prompt(self) -> str:
"""
Собрать полный system prompt:
SOUL.md + system_prompt из YAML + skills + memory context
"""
parts = []
# 1. SOUL.md — личность агента
soul = self._load_soul()
if soul:
parts.append(soul)
# 2. System prompt из agent.yaml
if self.system_prompt_template:
parts.append(self.system_prompt_template)
# 3. Скиллы
skills = self._load_skills()
if skills:
parts.append("## Скиллы\n\n" + skills)
# 4. Контекст из памяти (profile.md, index.md, daily note)
ctx = memory.read_context(self.agent_dir)
if ctx:
parts.append("## Контекст из памяти\n\n" + ctx)
# 5. Языковая инструкция
lang = memory.get_setting(self.agent_dir, "language")
if lang:
parts.append(t("system_lang_instruction", lang))
return "\n\n---\n\n".join(parts)build_group_system_prompt method · python · L215-L258 (44 LOC)src/agent.py
def build_group_system_prompt(self, chat_id: int) -> str:
"""
Собрать system prompt для группового чата.
Отличия от build_system_prompt:
- Вместо profile.md → groups/{chat_id}/context.md
- Вместо личного daily → groups/{chat_id}/daily/
- Добавляет инструкцию для группового режима
- НЕ включает личную wiki/profile владельца
"""
parts = []
# 1. SOUL.md — личность (общая)
soul = self._load_soul()
if soul:
parts.append(soul)
# 2. System prompt из agent.yaml (общий)
if self.system_prompt_template:
parts.append(self.system_prompt_template)
# 3. Инструкция для группового режима
group_instructions = (
"## Режим группового чата\n\n"
"Ты в групповом чате. Правила:\n"
"- Отвечай только когда к тебе обращаются\n"
"- Учитывай контекст предыдущих сообщений (они в логе ниже)\n"
"- Обр_parse_allowed_tools method · python · L260-L266 (7 LOC)src/agent.py
def _parse_allowed_tools(self) -> list[str] | None:
"""Извлечь allowed tools из claude_flags."""
flags = self.claude_flags
for i, flag in enumerate(flags):
if flag == "--allowedTools" and i + 1 < len(flags):
return flags[i + 1].split(",")
return Nonecall_claude method · python · L268-L428 (161 LOC)src/agent.py
async def call_claude(
self,
message: str,
files: list[str] | None = None,
semaphore: asyncio.Semaphore | None = None,
on_tool_use: Callable[[str], Awaitable[None]] | None = None,
on_text_delta: Callable[[str], Awaitable[None]] | None = None,
group_chat_id: int | None = None,
) -> str:
"""
Вызвать Claude через claude-agent-sdk.
Args:
message: текст сообщения от пользователя
files: список путей к файлам (будут упомянуты в промпте)
semaphore: глобальный семафор для ограничения параллельных вызовов
on_tool_use: async callback, вызывается с tool hint строкой
при каждом использовании инструмента
on_text_delta: async callback, вызывается при каждом TextBlock
для streaming ответа
group_chat_id: ID группового чата (None для DM)
Returns:
Текстовый ответ от Claude
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
AgentWorker class · python · L20-L157 (138 LOC)src/agent_worker.py
class AgentWorker:
"""
Воркер агента: читает из bus → call_claude → публикует ответ.
Одновременно обрабатывает по одному сообщению на chat_id
(сериализация через _active набор).
"""
def __init__(
self,
agent: "Agent",
bus: FleetBus,
semaphore: asyncio.Semaphore,
):
self.agent = agent
self.bus = bus
self.semaphore = semaphore
self._queue_name = f"agent:{agent.name}"
self._running = False
# Активные chat_id → Task (для /stop)
self._active_tasks: dict[int, asyncio.Task] = {}
def cancel_task(self, chat_id: int) -> bool:
"""Отменить активную задачу для chat_id (для /stop)."""
task = self._active_tasks.get(chat_id)
if task and not task.done():
task.cancel()
self._active_tasks.pop(chat_id, None)
return True
return False
async def run(self) -> None:
"""Основной цикл: читать из bus, обрабатыват__init__ method · python · L28-L40 (13 LOC)src/agent_worker.py
def __init__(
self,
agent: "Agent",
bus: FleetBus,
semaphore: asyncio.Semaphore,
):
self.agent = agent
self.bus = bus
self.semaphore = semaphore
self._queue_name = f"agent:{agent.name}"
self._running = False
# Активные chat_id → Task (для /stop)
self._active_tasks: dict[int, asyncio.Task] = {}cancel_task method · python · L42-L49 (8 LOC)src/agent_worker.py
def cancel_task(self, chat_id: int) -> bool:
"""Отменить активную задачу для chat_id (для /stop)."""
task = self._active_tasks.get(chat_id)
if task and not task.done():
task.cancel()
self._active_tasks.pop(chat_id, None)
return True
return Falserun method · python · L51-L69 (19 LOC)src/agent_worker.py
async def run(self) -> None:
"""Основной цикл: читать из bus, обрабатывать, отвечать."""
self._running = True
logger.info(f"AgentWorker '{self.agent.name}' запущен")
while self._running:
try:
msg = await self.bus.consume(self._queue_name)
# Обработать в отдельной задаче (не блокируем очередь)
task = asyncio.create_task(self._handle_message(msg))
if msg.chat_id:
self._active_tasks[msg.chat_id] = task
except asyncio.CancelledError:
logger.info(f"AgentWorker '{self.agent.name}' остановлен")
break
except Exception as e:
logger.error(f"AgentWorker '{self.agent.name}' error: {e}")
self._running = False_handle_message method · python · L71-L153 (83 LOC)src/agent_worker.py
async def _handle_message(self, msg: FleetMessage) -> None:
"""Обработать одно входящее сообщение."""
chat_id = msg.chat_id
# Пробросить thread_id через все ответные сообщения
thread_id = msg.metadata.get("message_thread_id")
base_meta = {"message_thread_id": thread_id} if thread_id else {}
# Уведомить bridge что начали обработку
await self.bus.publish(FleetMessage(
source=f"agent:{self.agent.name}",
target=f"telegram:{self.agent.name}",
content="",
msg_type=MessageType.SYSTEM,
chat_id=chat_id,
metadata={**base_meta, "event": "processing_started"},
))
try:
# Колбек для tool hints — пересылаем через bus
async def on_tool_use(hint: str):
await self.bus.publish(FleetMessage(
source=f"agent:{self.agent.name}",
target=f"telegram:{self.agent.name}",
stop method · python · L155-L157 (3 LOC)src/agent_worker.py
def stop(self) -> None:
"""Остановить воркер."""
self._running = FalseMessageType class · python · L21-L26 (6 LOC)src/bus.py
class MessageType(Enum):
"""Типы сообщений в шине."""
INBOUND = "inbound" # Входящее от пользователя
OUTBOUND = "outbound" # Ответ агента → канал
AGENT_TO_AGENT = "a2a" # Между агентами
SYSTEM = "system" # Системные (heartbeat, dream, etc.)FleetMessage class · python · L30-L42 (13 LOC)src/bus.py
class FleetMessage:
"""Единый формат сообщения в шине."""
source: str # "telegram", "agent:me", "system"
target: str # "orchestrator", "agent:coder", "telegram:{chat_id}"
content: str
msg_type: MessageType = MessageType.INBOUND
session_id: str = ""
chat_id: int = 0
user_id: int = 0
files: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
id: str = field(default_factory=lambda: uuid.uuid4().hex[:12])
timestamp: float = field(default_factory=time.time)Repobility analyzer · published findings · https://repobility.com
FleetBus class · python · L45-L148 (104 LOC)src/bus.py
class FleetBus:
"""
Pub/sub шина сообщений на asyncio.Queue.
Подписчики регистрируются по имени (например "orchestrator", "agent:me").
Сообщения доставляются по полю target:
- Точный адрес: "agent:me" → очередь "agent:me"
- Broadcast: "*" → все подписчики
- Prefix: "agent:*" → все подписчики с префиксом "agent:"
"""
def __init__(self):
self._queues: dict[str, asyncio.Queue] = {}
self._running = False
def subscribe(self, name: str, maxsize: int = 100) -> asyncio.Queue:
"""
Зарегистрировать подписчика.
Args:
name: уникальное имя ("orchestrator", "agent:me", "telegram")
maxsize: размер очереди
Returns:
asyncio.Queue для чтения сообщений
"""
if name in self._queues:
return self._queues[name]
q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
self._queues[name] = q
logger.info(f"Bus: подписчик '{name}' зарегистрир__init__ method · python · L56-L58 (3 LOC)src/bus.py
def __init__(self):
self._queues: dict[str, asyncio.Queue] = {}
self._running = Falsesubscribe method · python · L60-L76 (17 LOC)src/bus.py
def subscribe(self, name: str, maxsize: int = 100) -> asyncio.Queue:
"""
Зарегистрировать подписчика.
Args:
name: уникальное имя ("orchestrator", "agent:me", "telegram")
maxsize: размер очереди
Returns:
asyncio.Queue для чтения сообщений
"""
if name in self._queues:
return self._queues[name]
q: asyncio.Queue = asyncio.Queue(maxsize=maxsize)
self._queues[name] = q
logger.info(f"Bus: подписчик '{name}' зарегистрирован")
return qunsubscribe method · python · L78-L81 (4 LOC)src/bus.py
def unsubscribe(self, name: str) -> None:
"""Удалить подписчика."""
self._queues.pop(name, None)
logger.info(f"Bus: подписчик '{name}' удалён")publish method · python · L83-L115 (33 LOC)src/bus.py
async def publish(self, msg: FleetMessage) -> int:
"""
Опубликовать сообщение в шину.
Маршрутизация по msg.target:
- "*" → все подписчики
- "agent:*" → все agent:*
- "agent:me" → точно agent:me
Returns:
Количество получателей
"""
delivered = 0
target = msg.target
for name, queue in self._queues.items():
if self._matches(name, target):
try:
queue.put_nowait(msg)
delivered += 1
except asyncio.QueueFull:
logger.warning(
f"Bus: очередь '{name}' переполнена, "
f"сообщение от '{msg.source}' потеряно"
)
if delivered == 0:
logger.warning(
f"Bus: нет получателей для target='{target}' "
f"(source='{msg.source}')"
)
return deliveredconsume method · python · L117-L133 (17 LOC)src/bus.py
async def consume(self, name: str) -> FleetMessage:
"""
Прочитать одно сообщение из очереди подписчика.
Args:
name: имя подписчика
Returns:
FleetMessage
Raises:
KeyError: если подписчик не зарегистрирован
"""
queue = self._queues.get(name)
if queue is None:
raise KeyError(f"Подписчик '{name}' не зарегистрирован")
return await queue.get()_matches method · python · L141-L148 (8 LOC)src/bus.py
def _matches(subscriber: str, target: str) -> bool:
"""Проверить, подходит ли подписчик под target."""
if target == "*":
return True
if target.endswith(":*"):
prefix = target[:-1] # "agent:*" → "agent:"
return subscriber.startswith(prefix)
return subscriber == targetfind_root function · python · L17-L23 (7 LOC)src/cli.py
def find_root() -> Path:
"""Найти корень проекта."""
current = Path.cwd()
for parent in [current] + list(current.parents):
if (parent / "agents").is_dir():
return parent
return Path(__file__).parent.parentSame scanner, your repo: https://repobility.com — Repobility
cmd_list_agents function · python · L26-L46 (21 LOC)src/cli.py
def cmd_list_agents(manager: AgentManager) -> None:
"""Таблица всех агентов."""
agents = manager.list_agents()
if not agents:
print("Агенты не найдены.")
return
# Форматирование таблицы
header = f"{'Имя':<15} {'Название':<25} {'Модель':<10} {'Токен':<10}"
print(header)
print("-" * len(header))
for a in agents:
token_status = "✓ задан" if a["token_set"] else "✗ нет"
print(
f"{a['name']:<15} {a['display_name']:<25} "
f"{a['model']:<10} {token_status:<10}"
)
print(f"\nВсего: {len(agents)} агентов")cmd_validate function · python · L49-L70 (22 LOC)src/cli.py
def cmd_validate(manager: AgentManager) -> None:
"""Валидация всех агентов."""
results = manager.validate_all()
if not results:
print("Агенты не найдены.")
return
all_ok = True
for name, (ok, errors) in results.items():
if ok:
print(f" ✓ {name}")
else:
all_ok = False
print(f" ✗ {name}:")
for err in errors:
print(f" - {err}")
if all_ok:
print(f"\nВсе {len(results)} агентов валидны.")
else:
print("\nЕсть ошибки. Исправь и запусти снова.")cmd_create_agent function · python · L73-L138 (66 LOC)src/cli.py
def cmd_create_agent(manager: AgentManager) -> None:
"""Интерактивный визард создания агента."""
print("=== Создание нового агента ===\n")
# 1. Имя
while True:
name = input("Имя агента (латиницей, для папки): ").strip().lower()
if not name:
print(" Имя не может быть пустым.")
continue
if (manager.agents_dir / name).exists():
print(f" Агент '{name}' уже существует.")
continue
break
# 2. Отображаемое имя
display_name = input("Отображаемое имя (на русском): ").strip()
if not display_name:
display_name = name.title()
# 3. Токен бота
while True:
bot_token = input("Токен бота (от @BotFather): ").strip()
if not bot_token:
print(" Токен не может быть пустым.")
continue
break
# 4. Описание роли
description = input("Описание роли (одно предложение): ").strip()
if not description:
description = f"AI-аpage 1 / 4next ›