Function bodies 565 total
BridgeServer class · typescript · L20-L129 (110 LOC)bridge/src/server.ts
export class BridgeServer {
private wss: WebSocketServer | null = null;
private wa: WhatsAppClient | null = null;
private clients: Set<WebSocket> = new Set();
constructor(private port: number, private authDir: string, private token?: string) {}
async start(): Promise<void> {
// Bind to localhost only — never expose to external network
this.wss = new WebSocketServer({ host: '127.0.0.1', port: this.port });
console.log(`🌉 Bridge server listening on ws://127.0.0.1:${this.port}`);
if (this.token) console.log('🔒 Token authentication enabled');
// Initialize WhatsApp client
this.wa = new WhatsAppClient({
authDir: this.authDir,
onMessage: (msg) => this.broadcast({ type: 'message', ...msg }),
onQR: (qr) => this.broadcast({ type: 'qr', qr }),
onStatus: (status) => this.broadcast({ type: 'status', status }),
});
// Handle WebSocket connections
this.wss.on('connection', (ws) => {
if (this.token) {
// Require astart method · typescript · L27-L68 (42 LOC)bridge/src/server.ts
async start(): Promise<void> {
// Bind to localhost only — never expose to external network
this.wss = new WebSocketServer({ host: '127.0.0.1', port: this.port });
console.log(`🌉 Bridge server listening on ws://127.0.0.1:${this.port}`);
if (this.token) console.log('🔒 Token authentication enabled');
// Initialize WhatsApp client
this.wa = new WhatsAppClient({
authDir: this.authDir,
onMessage: (msg) => this.broadcast({ type: 'message', ...msg }),
onQR: (qr) => this.broadcast({ type: 'qr', qr }),
onStatus: (status) => this.broadcast({ type: 'status', status }),
});
// Handle WebSocket connections
this.wss.on('connection', (ws) => {
if (this.token) {
// Require auth handshake as first message
const timeout = setTimeout(() => ws.close(4001, 'Auth timeout'), 5000);
ws.once('message', (data) => {
clearTimeout(timeout);
try {
const msg = JSON.parse(data.toString());
setupClient method · typescript · L70-L93 (24 LOC)bridge/src/server.ts
private setupClient(ws: WebSocket): void {
this.clients.add(ws);
ws.on('message', async (data) => {
try {
const cmd = JSON.parse(data.toString()) as SendCommand;
await this.handleCommand(cmd);
ws.send(JSON.stringify({ type: 'sent', to: cmd.to }));
} catch (error) {
console.error('Error handling command:', error);
ws.send(JSON.stringify({ type: 'error', error: String(error) }));
}
});
ws.on('close', () => {
console.log('🔌 Python client disconnected');
this.clients.delete(ws);
});
ws.on('error', (error) => {
console.error('WebSocket error:', error);
this.clients.delete(ws);
});
}handleCommand method · typescript · L95-L99 (5 LOC)bridge/src/server.ts
private async handleCommand(cmd: SendCommand): Promise<void> {
if (cmd.type === 'send' && this.wa) {
await this.wa.sendMessage(cmd.to, cmd.text);
}
}broadcast method · typescript · L101-L108 (8 LOC)bridge/src/server.ts
private broadcast(msg: BridgeMessage): void {
const data = JSON.stringify(msg);
for (const client of this.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
}
}stop method · typescript · L110-L128 (19 LOC)bridge/src/server.ts
async stop(): Promise<void> {
// Close all client connections
for (const client of this.clients) {
client.close();
}
this.clients.clear();
// Close WebSocket server
if (this.wss) {
this.wss.close();
this.wss = null;
}
// Disconnect WhatsApp
if (this.wa) {
await this.wa.disconnect();
this.wa = null;
}
}WhatsAppClient class · typescript · L36-L187 (152 LOC)bridge/src/whatsapp.ts
export class WhatsAppClient {
private sock: any = null;
private options: WhatsAppClientOptions;
private reconnecting = false;
constructor(options: WhatsAppClientOptions) {
this.options = options;
}
async connect(): Promise<void> {
const logger = pino({ level: 'silent' });
const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir);
const { version } = await fetchLatestBaileysVersion();
console.log(`Using Baileys version: ${version.join('.')}`);
// Create socket following OpenClaw's pattern
this.sock = makeWASocket({
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
version,
logger,
printQRInTerminal: false,
browser: ['nanobot', 'cli', VERSION],
syncFullHistory: false,
markOnlineOnConnect: false,
});
// Handle WebSocket errors
if (this.sock.ws && typeof this.sock.ws.on === 'function') {
this.sock.ws.on('erroAll rows scored by the Repobility analyzer (https://repobility.com)
constructor method · typescript · L41-L43 (3 LOC)bridge/src/whatsapp.ts
constructor(options: WhatsAppClientOptions) {
this.options = options;
}connect method · typescript · L45-L134 (90 LOC)bridge/src/whatsapp.ts
async connect(): Promise<void> {
const logger = pino({ level: 'silent' });
const { state, saveCreds } = await useMultiFileAuthState(this.options.authDir);
const { version } = await fetchLatestBaileysVersion();
console.log(`Using Baileys version: ${version.join('.')}`);
// Create socket following OpenClaw's pattern
this.sock = makeWASocket({
auth: {
creds: state.creds,
keys: makeCacheableSignalKeyStore(state.keys, logger),
},
version,
logger,
printQRInTerminal: false,
browser: ['nanobot', 'cli', VERSION],
syncFullHistory: false,
markOnlineOnConnect: false,
});
// Handle WebSocket errors
if (this.sock.ws && typeof this.sock.ws.on === 'function') {
this.sock.ws.on('error', (err: Error) => {
console.error('WebSocket error:', err.message);
});
}
// Handle connection updates
this.sock.ev.on('connection.update', async (update: any) => {
const { connecsetTimeout method · typescript · L94-L97 (4 LOC)bridge/src/whatsapp.ts
setTimeout(() => {
this.reconnecting = false;
this.connect();
}, 5000);extractMessageContent method · typescript · L136-L171 (36 LOC)bridge/src/whatsapp.ts
private extractMessageContent(msg: any): string | null {
const message = msg.message;
if (!message) return null;
// Text message
if (message.conversation) {
return message.conversation;
}
// Extended text (reply, link preview)
if (message.extendedTextMessage?.text) {
return message.extendedTextMessage.text;
}
// Image with caption
if (message.imageMessage?.caption) {
return `[Image] ${message.imageMessage.caption}`;
}
// Video with caption
if (message.videoMessage?.caption) {
return `[Video] ${message.videoMessage.caption}`;
}
// Document with caption
if (message.documentMessage?.caption) {
return `[Document] ${message.documentMessage.caption}`;
}
// Voice/Audio message
if (message.audioMessage) {
return `[Voice Message]`;
}
return null;
}sendMessage method · typescript · L173-L179 (7 LOC)bridge/src/whatsapp.ts
async sendMessage(to: string, text: string): Promise<void> {
if (!this.sock) {
throw new Error('Not connected');
}
await this.sock.sendMessage(to, { text });
}disconnect method · typescript · L181-L186 (6 LOC)bridge/src/whatsapp.ts
async disconnect(): Promise<void> {
if (this.sock) {
this.sock.end(undefined);
this.sock = null;
}
}ContextBuilder class · python · L16-L406 (391 LOC)nanobot/agent/context.py
class ContextBuilder:
"""
Builds the context (system prompt + messages) for the agent.
Assembles bootstrap files, memory, skills, and conversation history
into a coherent prompt for the LLM.
"""
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md", "IDENTITY.md"]
def __init__(self, workspace: Path, media_config: "MediaConfig | None" = None, vision_api_key: str = ""):
from nanobot.config.schema import MediaConfig
self.workspace = workspace
self.memory = MemoryStore(workspace)
self.skills = SkillsLoader(workspace)
self.media_config = media_config or MediaConfig()
self.vision_api_key = vision_api_key # API key for vision model
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""
Build the system prompt from bootstrap files, memory, and skills.
Args:
skill_names: Optional list of skills to include.
Retu__init__ method · python · L26-L32 (7 LOC)nanobot/agent/context.py
def __init__(self, workspace: Path, media_config: "MediaConfig | None" = None, vision_api_key: str = ""):
from nanobot.config.schema import MediaConfig
self.workspace = workspace
self.memory = MemoryStore(workspace)
self.skills = SkillsLoader(workspace)
self.media_config = media_config or MediaConfig()
self.vision_api_key = vision_api_key # API key for vision modelWant this analysis on your repo? https://repobility.com/scan/
build_system_prompt method · python · L34-L77 (44 LOC)nanobot/agent/context.py
def build_system_prompt(self, skill_names: list[str] | None = None) -> str:
"""
Build the system prompt from bootstrap files, memory, and skills.
Args:
skill_names: Optional list of skills to include.
Returns:
Complete system prompt.
"""
parts = []
# Core identity
parts.append(self._get_identity())
# Bootstrap files
bootstrap = self._load_bootstrap_files()
if bootstrap:
parts.append(bootstrap)
# Memory context
memory = self.memory.get_memory_context()
if memory:
parts.append(f"# Memory\n\n{memory}")
# Skills - progressive loading
# 1. Always-loaded skills: include full content
always_skills = self.skills.get_always_skills()
if always_skills:
always_content = self.skills.load_skills_for_context(always_skills)
if alway_get_identity method · python · L79-L116 (38 LOC)nanobot/agent/context.py
def _get_identity(self) -> str:
"""Get the core identity section."""
from datetime import datetime
import time as _time
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = _time.strftime("%Z") or "UTC"
workspace_path = str(self.workspace.expanduser().resolve())
system = platform.system()
runtime = f"{'macOS' if system == 'Darwin' else system} {platform.machine()}, Python {platform.python_version()}"
return f"""# nanobot 🐈
You are nanobot, a helpful AI assistant. You have access to tools that allow you to:
- Read, write, and edit files
- Execute shell commands
- Search the web and fetch web pages
- Send messages to users on chat channels
- Spawn subagents for complex background tasks
## Current Time
{now} ({tz})
## Runtime
{runtime}
## Workspace
Your workspace is at: {workspace_path}
- Long-term memory: {workspace_path}/memory/MEMORY.md
- History log: {workspace_path}/memory/HISTORY.md (grep-s_load_bootstrap_files method · python · L118-L128 (11 LOC)nanobot/agent/context.py
def _load_bootstrap_files(self) -> str:
"""Load all bootstrap files from workspace."""
parts = []
for filename in self.BOOTSTRAP_FILES:
file_path = self.workspace / filename
if file_path.exists():
content = file_path.read_text(encoding="utf-8")
parts.append(f"## {filename}\n\n{content}")
return "\n\n".join(parts) if parts else ""build_messages method · python · L130-L168 (39 LOC)nanobot/agent/context.py
async def build_messages(
self,
history: list[dict[str, Any]],
current_message: str,
skill_names: list[str] | None = None,
media: list[str] | None = None,
channel: str | None = None,
chat_id: str | None = None,
) -> list[dict[str, Any]]:
"""
Build the complete message list for an LLM call.
Args:
history: Previous conversation messages.
current_message: The new user message.
skill_names: Optional skills to include.
media: Optional list of local file paths for images/media.
channel: Current channel (telegram, feishu, etc.).
chat_id: Current chat/user ID.
Returns:
List of messages including system prompt.
"""
messages = []
# System prompt
system_prompt = self.build_system_prompt(skill_names)
if channel and chat_id:
system_prompt += f"\n\n## Current Session\n_build_user_content_async method · python · L170-L228 (59 LOC)nanobot/agent/context.py
async def _build_user_content_async(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional images and AI understanding.
If image understanding is enabled, describes each image using a vision model
and includes the description in the message.
"""
if not media:
return text
# Check if we need image understanding
use_understanding = (
self.media_config.image.understanding
and self.vision_provider
)
descriptions = []
if use_understanding:
for path in media:
p = Path(path)
if p.is_file():
desc = await self.describe_image(p)
if desc:
descriptions.append(desc)
# Build images list
images = []
for path in media:
p = Path(path)
mime, _ = mimetypes.guess_type(pa_build_user_content method · python · L230-L257 (28 LOC)nanobot/agent/context.py
def _build_user_content(self, text: str, media: list[str] | None) -> str | list[dict[str, Any]]:
"""Build user message content with optional base64-encoded images.
Images are processed (resized, compressed) before encoding if media processing is enabled.
"""
if not media:
return text
images = []
for path in media:
p = Path(path)
mime, _ = mimetypes.guess_type(path)
if not p.is_file() or not mime or not mime.startswith("image/"):
continue
# Process image if media processing is enabled
if self.media_config.image.enabled:
processed_data = self._process_image(p)
else:
processed_data = p.read_bytes()
mime = "image/jpeg" # Default to jpeg for processed images
b64 = base64.b64encode(processed_data).decode()
images.append({"type": "image_url", "image_url": {"url": f"da_process_image method · python · L259-L292 (34 LOC)nanobot/agent/context.py
def _process_image(self, path: Path) -> tuple[bytes, str]:
"""Process image: resize, compress, and limit file size.
Returns:
Tuple of (processed_bytes, mime_type)
"""
img = Image.open(path)
config = self.media_config.image
# Convert to RGB if necessary (handles RGBA, palette, etc.)
if img.mode not in ("RGB", "L"): # L is grayscale
img = img.convert("RGB")
# Resize if larger than max_size (maintaining aspect ratio)
max_dim = config.max_size
if img.width > max_dim or img.height > max_dim:
img.thumbnail((max_dim, max_dim), Image.Resampling.LANCZOS)
# Compress to JPEG with quality setting
output = io.BytesIO()
img.save(output, format="JPEG", quality=config.quality, optimize=True)
data = output.getvalue()
# Further compress if still over max_bytes
if len(data) > config.max_bytes:
# Reduce quality until underdescribe_image method · python · L294-L344 (51 LOC)nanobot/agent/context.py
async def describe_image(self, image_path: Path) -> str | None:
"""Describe an image using MiniMax VLM API.
Args:
image_path: Path to the image file.
Returns:
Description of the image, or None if description fails.
"""
if not self.media_config.image.understanding:
return None
if not self.vision_api_key:
return None
try:
# Process image first
processed_data, mime = self._process_image(image_path)
b64 = base64.b64encode(processed_data).decode()
# Call MiniMax VLM API directly (like OpenCLAW does)
import httpx
url = "https://api.minimax.io/v1/coding_plan/vlm"
headers = {
"Authorization": f"Bearer {self.vision_api_key}",
"Content-Type": "application/json",
"MM-API-Source": "nanobot"
}
payload = {
"prompt": "Powered by Repobility — scan your code at https://repobility.com
add_tool_result method · python · L346-L371 (26 LOC)nanobot/agent/context.py
def add_tool_result(
self,
messages: list[dict[str, Any]],
tool_call_id: str,
tool_name: str,
result: str
) -> list[dict[str, Any]]:
"""
Add a tool result to the message list.
Args:
messages: Current message list.
tool_call_id: ID of the tool call.
tool_name: Name of the tool.
result: Tool execution result.
Returns:
Updated message list.
"""
messages.append({
"role": "tool",
"tool_call_id": tool_call_id,
"name": tool_name,
"content": result
})
return messagesadd_assistant_message method · python · L373-L406 (34 LOC)nanobot/agent/context.py
def add_assistant_message(
self,
messages: list[dict[str, Any]],
content: str | None,
tool_calls: list[dict[str, Any]] | None = None,
reasoning_content: str | None = None,
) -> list[dict[str, Any]]:
"""
Add an assistant message to the message list.
Args:
messages: Current message list.
content: Message content.
tool_calls: Optional tool calls.
reasoning_content: Thinking output (Kimi, DeepSeek-R1, etc.).
Returns:
Updated message list.
"""
msg: dict[str, Any] = {"role": "assistant"}
# Always include content — some providers (e.g. StepFun) reject
# assistant messages that omit the key entirely.
msg["content"] = content
if tool_calls:
msg["tool_calls"] = tool_calls
# Include reasoning content when provided (required by some thinking models)
if reasoning_co__init__ method · python · L54-L115 (62 LOC)nanobot/agent/loop.py
def __init__(
self,
bus: MessageBus,
provider: LLMProvider,
workspace: Path,
model: str | None = None,
max_task_duration: int = 600, # seconds (default 10 minutes)
temperature: float = 0.7,
max_tokens: int = 4096,
memory_window: int = 50,
brave_api_key: str | None = None,
tavily_api_key: str | None = None,
gemini_api_key: str | None = None,
exec_config: ExecToolConfig | None = None,
cron_service: CronService | None = None,
restrict_to_workspace: bool = False,
session_manager: SessionManager | None = None,
mcp_servers: dict | None = None,
media_config: "MediaConfig | None" = None,
vision_api_key: str = "",
):
from nanobot.config.schema import ExecToolConfig, MediaConfig
self.bus = bus
self.provider = provider
self.workspace = workspace
self.model = model or provider.get_default_model()
_register_default_tools method · python · L117-L163 (47 LOC)nanobot/agent/loop.py
def _register_default_tools(self) -> None:
"""Register the default set of tools."""
# File tools (workspace for relative paths, restrict if configured)
allowed_dir = self.workspace if self.restrict_to_workspace else None
self.tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
self.tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
# Shell tool
self.tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
restrict_to_workspace=self.restrict_to_workspace,
))
# Web tools
self.tools.register(WebSearchTool(api_key=self.brave_api_key))
self.tools.register(TavilySearchTool(api_key=self.tavil_track_tool_call method · python · L165-L188 (24 LOC)nanobot/agent/loop.py
def _track_tool_call(self, tool_name: str, args: dict) -> bool:
"""
Track tool call for loop detection.
Returns True if loop detected (3+ consecutive identical calls), False otherwise.
"""
# Create a hashable key from tool_name + sorted args
args_str = json.dumps(args, sort_keys=True, ensure_ascii=False)
call_key = (tool_name, args_str)
# Check if this is the same as the last call
if self._tool_call_history and self._tool_call_history[-1] == call_key:
self._tool_call_history.append(call_key)
else:
# Reset history if different call
self._tool_call_history = [call_key]
# Check if we've exceeded the threshold
if len(self._tool_call_history) >= self._loop_detection_max:
# Check if ALL calls in history are identical
if all(key == self._tool_call_history[0] for key in self._tool_call_history):
logger._reset_tool_tracking method · python · L190-L192 (3 LOC)nanobot/agent/loop.py
def _reset_tool_tracking(self) -> None:
"""Reset tool call tracking at the start of each user request."""
self._tool_call_history = []_connect_mcp method · python · L194-L214 (21 LOC)nanobot/agent/loop.py
async def _connect_mcp(self) -> None:
"""Connect to configured MCP servers (one-time, lazy)."""
if self._mcp_connected or self._mcp_connecting or not self._mcp_servers:
return
self._mcp_connecting = True
from nanobot.agent.tools.mcp import connect_mcp_servers
try:
self._mcp_stack = AsyncExitStack()
await self._mcp_stack.__aenter__()
await connect_mcp_servers(self._mcp_servers, self.tools, self._mcp_stack)
self._mcp_connected = True
except Exception as e:
logger.error("Failed to connect MCP servers (will retry next message): {}", e)
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except Exception:
pass
self._mcp_stack = None
finally:
self._mcp_connecting = False_set_tool_context method · python · L216-L228 (13 LOC)nanobot/agent/loop.py
def _set_tool_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None:
"""Update context for all tools that need routing info."""
if message_tool := self.tools.get("message"):
if isinstance(message_tool, MessageTool):
message_tool.set_context(channel, chat_id, message_id)
if spawn_tool := self.tools.get("spawn"):
if isinstance(spawn_tool, SpawnTool):
spawn_tool.set_context(channel, chat_id)
if cron_tool := self.tools.get("cron"):
if isinstance(cron_tool, CronTool):
cron_tool.set_context(channel, chat_id)Source: Repobility analyzer · https://repobility.com
_strip_think method · python · L231-L235 (5 LOC)nanobot/agent/loop.py
def _strip_think(text: str | None) -> str | None:
"""Remove <think>…</think> blocks that some models embed in content."""
if not text:
return None
return re.sub(r"<think>[\s\S]*?</think>", "", text).strip() or None_tool_hint method · python · L238-L245 (8 LOC)nanobot/agent/loop.py
def _tool_hint(tool_calls: list) -> str:
"""Format tool calls as concise hint, e.g. 'web_search("query")'."""
def _fmt(tc):
val = next(iter(tc.arguments.values()), None) if tc.arguments else None
if not isinstance(val, str):
return tc.name
return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")'
return ", ".join(_fmt(tc) for tc in tool_calls)_run_agent_loop method · python · L247-L460 (214 LOC)nanobot/agent/loop.py
async def _run_agent_loop(
self,
initial_messages: list[dict],
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> tuple[str | None, list[str]]:
"""
Run the agent iteration loop.
Args:
initial_messages: Starting messages for the LLM conversation.
on_progress: Optional callback to push intermediate content to the user.
Returns:
Tuple of (final_content, list_of_tools_used).
"""
# Initialize messages from initial_messages
messages = list(initial_messages)
# Force tool usage for certain keywords - get the LAST message (current user message)
user_message = ""
if initial_messages:
for msg in reversed(initial_messages):
if msg.get("role") == "user":
user_message = msg.get("content", "")
break
# Add mandatory tool hint for browser-related requests
run method · python · L462-L499 (38 LOC)nanobot/agent/loop.py
async def run(self) -> None:
"""Run the agent loop, processing messages from the bus."""
self._running = True
await self._connect_mcp()
logger.info("Agent loop started")
while self._running:
try:
msg = await asyncio.wait_for(
self.bus.consume_inbound(),
timeout=1.0
)
# Check for /halt command BEFORE processing (can interrupt current task)
cmd = msg.content.strip().lower()
if cmd == "/halt":
self._halt_requested = True
await self.bus.publish_outbound(OutboundMessage(
channel=msg.channel,
chat_id=msg.chat_id,
content="🛑 Task halted. Waiting for next instruction."
))
continue # Skip normal message processing
try:
response = awaclose_mcp method · python · L501-L508 (8 LOC)nanobot/agent/loop.py
async def close_mcp(self) -> None:
"""Close MCP connections."""
if self._mcp_stack:
try:
await self._mcp_stack.aclose()
except (RuntimeError, BaseExceptionGroup):
pass # MCP SDK cancel scope cleanup is noisy but harmless
self._mcp_stack = Nonestop method · python · L510-L513 (4 LOC)nanobot/agent/loop.py
def stop(self) -> None:
"""Stop the agent loop."""
self._running = False
logger.info("Agent loop stopping")_process_message method · python · L515-L623 (109 LOC)nanobot/agent/loop.py
async def _process_message(
self,
msg: InboundMessage,
session_key: str | None = None,
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> OutboundMessage | None:
"""
Process a single inbound message.
Args:
msg: The inbound message to process.
session_key: Override session key (used by process_direct).
on_progress: Optional callback for intermediate output (defaults to bus publish).
Returns:
The response message, or None if no response needed.
"""
# System messages route back via chat_id ("channel:chat_id")
if msg.channel == "system":
return await self._process_system_message(msg)
preview = msg.content[:80] + "..." if len(msg.content) > 80 else msg.content
logger.info("Processing message from {}:{}: {}", msg.channel, msg.sender_id, preview)
key = session_key or msg.session_key
sessio_process_system_message method · python · L625-L666 (42 LOC)nanobot/agent/loop.py
async def _process_system_message(self, msg: InboundMessage) -> OutboundMessage | None:
"""
Process a system message (e.g., subagent announce).
The chat_id field contains "original_channel:original_chat_id" to route
the response back to the correct destination.
"""
logger.info("Processing system message from {}", msg.sender_id)
# Parse origin from chat_id (format: "channel:chat_id")
if ":" in msg.chat_id:
parts = msg.chat_id.split(":", 1)
origin_channel = parts[0]
origin_chat_id = parts[1]
else:
# Fallback
origin_channel = "cli"
origin_chat_id = msg.chat_id
session_key = f"{origin_channel}:{origin_chat_id}"
session = self.sessions.get_or_create(session_key)
self._set_tool_context(origin_channel, origin_chat_id, msg.metadata.get("message_id"))
initial_messages = await self.context.build_messages(
All rows scored by the Repobility analyzer (https://repobility.com)
_consolidate_memory method · python · L668-L765 (98 LOC)nanobot/agent/loop.py
async def _consolidate_memory(self, session, archive_all: bool = False) -> None:
"""Consolidate old messages into MEMORY.md + HISTORY.md.
Args:
archive_all: If True, clear all messages and reset session (for /new command).
If False, only write to files without modifying session.
"""
memory = MemoryStore(self.workspace)
if archive_all:
old_messages = session.messages
keep_count = 0
logger.info("Memory consolidation (archive_all): {} total messages archived", len(session.messages))
else:
keep_count = self.memory_window // 2
if len(session.messages) <= keep_count:
logger.debug("Session {}: No consolidation needed (messages={}, keep={})", session.key, len(session.messages), keep_count)
return
messages_to_process = len(session.messages) - session.last_consolidated
if messages_to_process <process_direct method · python · L767-L797 (31 LOC)nanobot/agent/loop.py
async def process_direct(
self,
content: str,
session_key: str = "cli:direct",
channel: str = "cli",
chat_id: str = "direct",
on_progress: Callable[[str], Awaitable[None]] | None = None,
) -> str:
"""
Process a message directly (for CLI or cron usage).
Args:
content: The message content.
session_key: Session identifier (overrides channel:chat_id for session lookup).
channel: Source channel (for tool context routing).
chat_id: Source chat ID (for tool context routing).
on_progress: Optional callback for intermediate output.
Returns:
The agent's response.
"""
await self._connect_mcp()
msg = InboundMessage(
channel=channel,
sender_id="user",
chat_id=chat_id,
content=content
)
response = await self._process_message(msg, session_key=session_keyMemoryStore class · python · L8-L30 (23 LOC)nanobot/agent/memory.py
class MemoryStore:
"""Two-layer memory: MEMORY.md (long-term facts) + HISTORY.md (grep-searchable log)."""
def __init__(self, workspace: Path):
self.memory_dir = ensure_dir(workspace / "memory")
self.memory_file = self.memory_dir / "MEMORY.md"
self.history_file = self.memory_dir / "HISTORY.md"
def read_long_term(self) -> str:
if self.memory_file.exists():
return self.memory_file.read_text(encoding="utf-8")
return ""
def write_long_term(self, content: str) -> None:
self.memory_file.write_text(content, encoding="utf-8")
def append_history(self, entry: str) -> None:
with open(self.history_file, "a", encoding="utf-8") as f:
f.write(entry.rstrip() + "\n\n")
def get_memory_context(self) -> str:
long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else ""__init__ method · python · L11-L14 (4 LOC)nanobot/agent/memory.py
def __init__(self, workspace: Path):
self.memory_dir = ensure_dir(workspace / "memory")
self.memory_file = self.memory_dir / "MEMORY.md"
self.history_file = self.memory_dir / "HISTORY.md"read_long_term method · python · L16-L19 (4 LOC)nanobot/agent/memory.py
def read_long_term(self) -> str:
if self.memory_file.exists():
return self.memory_file.read_text(encoding="utf-8")
return ""append_history method · python · L24-L26 (3 LOC)nanobot/agent/memory.py
def append_history(self, entry: str) -> None:
with open(self.history_file, "a", encoding="utf-8") as f:
f.write(entry.rstrip() + "\n\n")get_memory_context method · python · L28-L30 (3 LOC)nanobot/agent/memory.py
def get_memory_context(self) -> str:
long_term = self.read_long_term()
return f"## Long-term Memory\n{long_term}" if long_term else ""SkillsLoader class · python · L13-L228 (216 LOC)nanobot/agent/skills.py
class SkillsLoader:
"""
Loader for agent skills.
Skills are markdown files (SKILL.md) that teach the agent how to use
specific tools or perform certain tasks.
"""
def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
self.workspace = workspace
self.workspace_skills = workspace / "skills"
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIR
def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
"""
List all available skills.
Args:
filter_unavailable: If True, filter out skills with unmet requirements.
Returns:
List of skill info dicts with 'name', 'path', 'source'.
"""
skills = []
# Workspace skills (highest priority)
if self.workspace_skills.exists():
for skill_dir in self.workspace_skills.iterdir():
if skill_dir.is_dir():Want this analysis on your repo? https://repobility.com/scan/
__init__ method · python · L21-L24 (4 LOC)nanobot/agent/skills.py
def __init__(self, workspace: Path, builtin_skills_dir: Path | None = None):
self.workspace = workspace
self.workspace_skills = workspace / "skills"
self.builtin_skills = builtin_skills_dir or BUILTIN_SKILLS_DIRlist_skills method · python · L26-L57 (32 LOC)nanobot/agent/skills.py
def list_skills(self, filter_unavailable: bool = True) -> list[dict[str, str]]:
"""
List all available skills.
Args:
filter_unavailable: If True, filter out skills with unmet requirements.
Returns:
List of skill info dicts with 'name', 'path', 'source'.
"""
skills = []
# Workspace skills (highest priority)
if self.workspace_skills.exists():
for skill_dir in self.workspace_skills.iterdir():
if skill_dir.is_dir():
skill_file = skill_dir / "SKILL.md"
if skill_file.exists():
skills.append({"name": skill_dir.name, "path": str(skill_file), "source": "workspace"})
# Built-in skills
if self.builtin_skills and self.builtin_skills.exists():
for skill_dir in self.builtin_skills.iterdir():
if skill_dir.is_dir():
skill_load_skill method · python · L59-L80 (22 LOC)nanobot/agent/skills.py
def load_skill(self, name: str) -> str | None:
"""
Load a skill by name.
Args:
name: Skill name (directory name).
Returns:
Skill content or None if not found.
"""
# Check workspace first
workspace_skill = self.workspace_skills / name / "SKILL.md"
if workspace_skill.exists():
return workspace_skill.read_text(encoding="utf-8")
# Check built-in
if self.builtin_skills:
builtin_skill = self.builtin_skills / name / "SKILL.md"
if builtin_skill.exists():
return builtin_skill.read_text(encoding="utf-8")
return Nonepage 1 / 12next ›