Function bodies 565 total
load_skills_for_context method · python · L82-L99 (18 LOC)nanobot/agent/skills.py
def load_skills_for_context(self, skill_names: list[str]) -> str:
"""
Load specific skills for inclusion in agent context.
Args:
skill_names: List of skill names to load.
Returns:
Formatted skills content.
"""
parts = []
for name in skill_names:
content = self.load_skill(name)
if content:
content = self._strip_frontmatter(content)
parts.append(f"### Skill: {name}\n\n{content}")
return "\n\n---\n\n".join(parts) if parts else ""build_skills_summary method · python · L101-L140 (40 LOC)nanobot/agent/skills.py
def build_skills_summary(self) -> str:
"""
Build a summary of all skills (name, description, path, availability).
This is used for progressive loading - the agent can read the full
skill content using read_file when needed.
Returns:
XML-formatted skills summary.
"""
all_skills = self.list_skills(filter_unavailable=False)
if not all_skills:
return ""
def escape_xml(s: str) -> str:
return s.replace("&", "&").replace("<", "<").replace(">", ">")
lines = ["<skills>"]
for s in all_skills:
name = escape_xml(s["name"])
path = s["path"]
desc = escape_xml(self._get_skill_description(s["name"]))
skill_meta = self._get_skill_meta(s["name"])
available = self._check_requirements(skill_meta)
lines.append(f" <skill available=\"{str(available).low_get_missing_requirements method · python · L142-L152 (11 LOC)nanobot/agent/skills.py
def _get_missing_requirements(self, skill_meta: dict) -> str:
"""Get a description of missing requirements."""
missing = []
requires = skill_meta.get("requires", {})
for b in requires.get("bins", []):
if not shutil.which(b):
missing.append(f"CLI: {b}")
for env in requires.get("env", []):
if not os.environ.get(env):
missing.append(f"ENV: {env}")
return ", ".join(missing)_get_skill_description method · python · L154-L159 (6 LOC)nanobot/agent/skills.py
def _get_skill_description(self, name: str) -> str:
"""Get the description of a skill from its frontmatter."""
meta = self.get_skill_metadata(name)
if meta and meta.get("description"):
return meta["description"]
return name # Fallback to skill name_strip_frontmatter method · python · L161-L167 (7 LOC)nanobot/agent/skills.py
def _strip_frontmatter(self, content: str) -> str:
"""Remove YAML frontmatter from markdown content."""
if content.startswith("---"):
match = re.match(r"^---\n.*?\n---\n", content, re.DOTALL)
if match:
return content[match.end():].strip()
return content_parse_nanobot_metadata method · python · L169-L175 (7 LOC)nanobot/agent/skills.py
def _parse_nanobot_metadata(self, raw: str) -> dict:
"""Parse skill metadata JSON from frontmatter (supports nanobot and openclaw keys)."""
try:
data = json.loads(raw)
return data.get("nanobot", data.get("openclaw", {})) if isinstance(data, dict) else {}
except (json.JSONDecodeError, TypeError):
return {}_check_requirements method · python · L177-L186 (10 LOC)nanobot/agent/skills.py
def _check_requirements(self, skill_meta: dict) -> bool:
"""Check if skill requirements are met (bins, env vars)."""
requires = skill_meta.get("requires", {})
for b in requires.get("bins", []):
if not shutil.which(b):
return False
for env in requires.get("env", []):
if not os.environ.get(env):
return False
return TrueSame scanner, your repo: https://repobility.com — Repobility
_get_skill_meta method · python · L188-L191 (4 LOC)nanobot/agent/skills.py
def _get_skill_meta(self, name: str) -> dict:
"""Get nanobot metadata for a skill (cached in frontmatter)."""
meta = self.get_skill_metadata(name) or {}
return self._parse_nanobot_metadata(meta.get("metadata", ""))get_always_skills method · python · L193-L201 (9 LOC)nanobot/agent/skills.py
def get_always_skills(self) -> list[str]:
"""Get skills marked as always=true that meet requirements."""
result = []
for s in self.list_skills(filter_unavailable=True):
meta = self.get_skill_metadata(s["name"]) or {}
skill_meta = self._parse_nanobot_metadata(meta.get("metadata", ""))
if skill_meta.get("always") or meta.get("always"):
result.append(s["name"])
return resultget_skill_metadata method · python · L203-L228 (26 LOC)nanobot/agent/skills.py
def get_skill_metadata(self, name: str) -> dict | None:
"""
Get metadata from a skill's frontmatter.
Args:
name: Skill name.
Returns:
Metadata dict or None.
"""
content = self.load_skill(name)
if not content:
return None
if content.startswith("---"):
match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
if match:
# Simple YAML parsing
metadata = {}
for line in match.group(1).split("\n"):
if ":" in line:
key, value = line.split(":", 1)
metadata[key.strip()] = value.strip().strip('"\'')
return metadata
return NoneSubagentManager class · python · L21-L259 (239 LOC)nanobot/agent/subagent.py
class SubagentManager:
"""
Manages background subagent execution.
Subagents are lightweight agent instances that run in the background
to handle specific tasks. They share the same LLM provider but have
isolated context and a focused system prompt.
"""
def __init__(
self,
provider: LLMProvider,
workspace: Path,
bus: MessageBus,
model: str | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
restrict_to_workspace: bool = False,
):
from nanobot.config.schema import ExecToolConfig
self.provider = provider
self.workspace = workspace
self.bus = bus
self.model = model or provider.get_default_model()
self.temperature = temperature
self.max_tokens = max_tokens
self.brave_api_key = brave_api_key
self.exec_config = __init__ method · python · L30-L52 (23 LOC)nanobot/agent/subagent.py
def __init__(
self,
provider: LLMProvider,
workspace: Path,
bus: MessageBus,
model: str | None = None,
temperature: float = 0.7,
max_tokens: int = 4096,
brave_api_key: str | None = None,
exec_config: "ExecToolConfig | None" = None,
restrict_to_workspace: bool = False,
):
from nanobot.config.schema import ExecToolConfig
self.provider = provider
self.workspace = workspace
self.bus = bus
self.model = model or provider.get_default_model()
self.temperature = temperature
self.max_tokens = max_tokens
self.brave_api_key = brave_api_key
self.exec_config = exec_config or ExecToolConfig()
self.restrict_to_workspace = restrict_to_workspace
self._running_tasks: dict[str, asyncio.Task[None]] = {}spawn method · python · L54-L91 (38 LOC)nanobot/agent/subagent.py
async def spawn(
self,
task: str,
label: str | None = None,
origin_channel: str = "cli",
origin_chat_id: str = "direct",
) -> str:
"""
Spawn a subagent to execute a task in the background.
Args:
task: The task description for the subagent.
label: Optional human-readable label for the task.
origin_channel: The channel to announce results to.
origin_chat_id: The chat ID to announce results to.
Returns:
Status message indicating the subagent was started.
"""
task_id = str(uuid.uuid4())[:8]
display_label = label or task[:30] + ("..." if len(task) > 30 else "")
origin = {
"channel": origin_channel,
"chat_id": origin_chat_id,
}
# Create background task
bg_task = asyncio.create_task(
self._run_subagent(task_id, task, display_labe_run_subagent method · python · L93-L186 (94 LOC)nanobot/agent/subagent.py
async def _run_subagent(
self,
task_id: str,
task: str,
label: str,
origin: dict[str, str],
) -> None:
"""Execute the subagent task and announce the result."""
logger.info("Subagent [{}] starting task: {}", task_id, label)
try:
# Build subagent tools (no message tool, no spawn tool)
tools = ToolRegistry()
allowed_dir = self.workspace if self.restrict_to_workspace else None
tools.register(ReadFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(WriteFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(EditFileTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ListDirTool(workspace=self.workspace, allowed_dir=allowed_dir))
tools.register(ExecTool(
working_dir=str(self.workspace),
timeout=self.exec_config.timeout,
_announce_result method · python · L188-L218 (31 LOC)nanobot/agent/subagent.py
async def _announce_result(
self,
task_id: str,
label: str,
task: str,
result: str,
origin: dict[str, str],
status: str,
) -> None:
"""Announce the subagent result to the main agent via the message bus."""
status_text = "completed successfully" if status == "ok" else "failed"
announce_content = f"""[Subagent '{label}' {status_text}]
Task: {task}
Result:
{result}
Summarize this naturally for the user. Keep it brief (1-2 sentences). Do not mention technical details like "subagent" or task IDs."""
# Inject as system message to trigger main agent
msg = InboundMessage(
channel="system",
sender_id="subagent",
chat_id=f"{origin['channel']}:{origin['chat_id']}",
content=announce_content,
)
await self.bus.publish_inbound(msg)
logger.debug("Subagent [{}] announced result to {}:{}", task_id, orAll rows above produced by Repobility · https://repobility.com
_build_subagent_prompt method · python · L220-L255 (36 LOC)nanobot/agent/subagent.py
def _build_subagent_prompt(self, task: str) -> str:
"""Build a focused system prompt for the subagent."""
from datetime import datetime
import time as _time
now = datetime.now().strftime("%Y-%m-%d %H:%M (%A)")
tz = _time.strftime("%Z") or "UTC"
return f"""# Subagent
## Current Time
{now} ({tz})
You are a subagent spawned by the main agent to complete a specific task.
## Rules
1. Stay focused - complete only the assigned task, nothing else
2. Your final response will be reported back to the main agent
3. Do not initiate conversations or take on side tasks
4. Be concise but informative in your findings
## What You Can Do
- Read and write files in the workspace
- Execute shell commands
- Search the web and fetch web pages
- Complete the task thoroughly
## What You Cannot Do
- Send messages directly to users (no message tool available)
- Spawn other subagents
- Access the main agent's conversation history
## Workspace
Your workspacget_running_count method · python · L257-L259 (3 LOC)nanobot/agent/subagent.py
def get_running_count(self) -> int:
"""Return the number of currently running subagents."""
return len(self._running_tasks)Tool class · python · L7-L102 (96 LOC)nanobot/agent/tools/base.py
class Tool(ABC):
"""
Abstract base class for agent tools.
Tools are capabilities that the agent can use to interact with
the environment, such as reading files, executing commands, etc.
"""
_TYPE_MAP = {
"string": str,
"integer": int,
"number": (int, float),
"boolean": bool,
"array": list,
"object": dict,
}
@property
@abstractmethod
def name(self) -> str:
"""Tool name used in function calls."""
pass
@property
@abstractmethod
def description(self) -> str:
"""Description of what the tool does."""
pass
@property
@abstractmethod
def parameters(self) -> dict[str, Any]:
"""JSON Schema for tool parameters."""
pass
@abstractmethod
async def execute(self, **kwargs: Any) -> str:
"""
Execute the tool with given parameters.
Args:
**kwargs: Tool-specific paramexecute method · python · L43-L53 (11 LOC)nanobot/agent/tools/base.py
async def execute(self, **kwargs: Any) -> str:
"""
Execute the tool with given parameters.
Args:
**kwargs: Tool-specific parameters.
Returns:
String result of the tool execution.
"""
passvalidate_params method · python · L55-L60 (6 LOC)nanobot/agent/tools/base.py
def validate_params(self, params: dict[str, Any]) -> list[str]:
"""Validate tool parameters against JSON schema. Returns error list (empty if valid)."""
schema = self.parameters or {}
if schema.get("type", "object") != "object":
raise ValueError(f"Schema must be object type, got {schema.get('type')!r}")
return self._validate(params, {**schema, "type": "object"}, "")_validate method · python · L62-L91 (30 LOC)nanobot/agent/tools/base.py
def _validate(self, val: Any, schema: dict[str, Any], path: str) -> list[str]:
t, label = schema.get("type"), path or "parameter"
if t in self._TYPE_MAP and not isinstance(val, self._TYPE_MAP[t]):
return [f"{label} should be {t}"]
errors = []
if "enum" in schema and val not in schema["enum"]:
errors.append(f"{label} must be one of {schema['enum']}")
if t in ("integer", "number"):
if "minimum" in schema and val < schema["minimum"]:
errors.append(f"{label} must be >= {schema['minimum']}")
if "maximum" in schema and val > schema["maximum"]:
errors.append(f"{label} must be <= {schema['maximum']}")
if t == "string":
if "minLength" in schema and len(val) < schema["minLength"]:
errors.append(f"{label} must be at least {schema['minLength']} chars")
if "maxLength" in schema and len(val) > schema["maxLength"]:
to_schema method · python · L93-L102 (10 LOC)nanobot/agent/tools/base.py
def to_schema(self) -> dict[str, Any]:
"""Convert tool to OpenAI function schema format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
}
}get_platform function · python · L49-L58 (10 LOC)nanobot/agent/tools/browser_manager.py
def get_platform() -> str:
"""Get the current platform."""
system = platform.system().lower()
if system == "darwin":
return "macos"
elif system == "linux":
return "linux"
elif system == "windows":
return "windows"
return "macos" # default to macosAbout: code-quality intelligence by Repobility · https://repobility.com
find_browser_path function · python · L61-L90 (30 LOC)nanobot/agent/tools/browser_manager.py
def find_browser_path(browser: str = "chrome") -> str | None:
"""Find the browser executable path."""
plat = get_platform()
# Check configured path first
config_path = os.environ.get(f"NANOBOT_{browser.upper()}_PATH")
if config_path and os.path.exists(config_path):
return config_path
# Check known paths
browser_config = BROWSERS.get(browser.lower())
if browser_config:
path = browser_config.get(plat)
if path and os.path.exists(path):
return path
# Try system PATH
try:
result = subprocess.run(
["which", browser.lower().replace(" ", "-")],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip()
except Exception:
pass
return Noneget_chrome_processes function · python · L93-L123 (31 LOC)nanobot/agent/tools/browser_manager.py
def get_chrome_processes(cdp_port: int = DEFAULT_CDP_PORT) -> list[dict[str, Any]]:
"""Get list of Chrome processes running with remote debugging."""
processes = []
try:
# Use ps to find Chrome processes
system = platform.system().lower()
if system == "darwin" or system == "linux":
result = subprocess.run(
["ps", "aux"],
capture_output=True,
text=True,
timeout=5
)
for line in result.stdout.split("\n"):
if "Chrome" in line or "chrome" in line:
parts = line.split()
if len(parts) >= 2:
try:
pid = int(parts[1])
# Check if this process has our port
if f"--remote-debugging-port={cdp_port}" in line or f"--remote-debugging-port " in line:
processes.append({
BrowserManager class · python · L126-L315 (190 LOC)nanobot/agent/tools/browser_manager.py
class BrowserManager:
"""Manages Chrome browser instances."""
def __init__(self, workspace: Path | None = None):
self.workspace = workspace or Path.home() / ".nanobot" / "workspace"
self.browser_dir = self.workspace / "browser"
self.browser_dir.mkdir(parents=True, exist_ok=True)
self.profiles = DEFAULT_PROFILES.copy()
def get_profile_config(self, profile: str) -> dict[str, Any]:
"""Get profile configuration."""
return self.profiles.get(profile, {
"port": DEFAULT_CDP_PORT,
"browser": "chrome",
"color": "#FF4500"
})
def list_profiles(self) -> dict[str, Any]:
"""List all available profiles."""
return {
"profiles": self.profiles
}
def get_user_data_dir(self, profile: str = "nanobot") -> Path:
"""Get the user data directory for a profile."""
return self.browser_dir / f"profile_{profile}"
async def start(
self,
__init__ method · python · L129-L133 (5 LOC)nanobot/agent/tools/browser_manager.py
def __init__(self, workspace: Path | None = None):
self.workspace = workspace or Path.home() / ".nanobot" / "workspace"
self.browser_dir = self.workspace / "browser"
self.browser_dir.mkdir(parents=True, exist_ok=True)
self.profiles = DEFAULT_PROFILES.copy()get_profile_config method · python · L135-L141 (7 LOC)nanobot/agent/tools/browser_manager.py
def get_profile_config(self, profile: str) -> dict[str, Any]:
"""Get profile configuration."""
return self.profiles.get(profile, {
"port": DEFAULT_CDP_PORT,
"browser": "chrome",
"color": "#FF4500"
})list_profiles method · python · L143-L147 (5 LOC)nanobot/agent/tools/browser_manager.py
def list_profiles(self) -> dict[str, Any]:
"""List all available profiles."""
return {
"profiles": self.profiles
}get_user_data_dir method · python · L149-L151 (3 LOC)nanobot/agent/tools/browser_manager.py
def get_user_data_dir(self, profile: str = "nanobot") -> Path:
"""Get the user data directory for a profile."""
return self.browser_dir / f"profile_{profile}"start method · python · L153-L238 (86 LOC)nanobot/agent/tools/browser_manager.py
async def start(
self,
browser: str = "chrome",
port: int = DEFAULT_CDP_PORT,
profile: str = "nanobot",
headless: bool = False,
) -> dict[str, Any]:
"""Start a browser instance."""
# Find browser executable
browser_path = find_browser_path(browser)
if not browser_path:
return {
"success": False,
"error": f"Browser '{browser}' not found. Install it or set NANOBOT_{browser.upper()}_PATH"
}
# Check if already running on this port
try:
import httpx
response = httpx.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if response.status_code == 200:
return {
"success": True,
"message": f"Browser already running on port {port}",
"port": port,
"profile": profile,
}
except Exception:
Repobility · open methodology · https://repobility.com/research/
stop method · python · L240-L288 (49 LOC)nanobot/agent/tools/browser_manager.py
async def stop(self, port: int = DEFAULT_CDP_PORT) -> dict[str, Any]:
"""Stop browser instance on the given port."""
try:
# Try to gracefully close via CDP first
import httpx
try:
# Get the WebSocket URL
response = httpx.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if response.status_code == 200:
data = response.json()
ws_url = data.get("webSocketDebuggerUrl", "")
if ws_url:
# Try to close via CDP
import websockets
try:
async with websockets.connect(ws_url) as ws:
await ws.send('{"id":1,"method":"Browser.close"}')
except Exception:
pass
except Exception:
pass
# Kill process by port
status method · python · L290-L315 (26 LOC)nanobot/agent/tools/browser_manager.py
async def status(self, port: int = DEFAULT_CDP_PORT) -> dict[str, Any]:
"""Check browser status."""
try:
import httpx
response = httpx.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if response.status_code == 200:
data = response.json()
return {
"success": True,
"running": True,
"port": port,
"browser": data.get("Browser", "Unknown"),
"webSocket": data.get("webSocketDebuggerUrl", ""),
}
else:
return {
"success": True,
"running": False,
"port": port,
}
except Exception:
return {
"success": True,
"running": False,
"port": port,
}BrowserTool class · python · L18-L114 (97 LOC)nanobot/agent/tools/browser.py
class BrowserTool(Tool):
"""Browser automation using browser-use CLI (same as OpenClaw)."""
name = "browser"
description = """Browser automation using browser-use CLI.
**Actions:**
- open: {"url": "https://..."} - Open a URL
- state - Get clickable elements with indices
- click: {"index": 1} - Click element by index
- input: {"index": 1, "text": "hello"} - Click and type
- screenshot - Take screenshot
- close - Close browser
**Workflow:**
1. browser({"action": "open", "url": "https://xiaohongshu.com"})
2. browser({"action": "state"}) - Get elements with indices
3. browser({"action": "click", "index": 5}) - Click element
**Example:**
- browser({"action": "open", "url": "https://linkedin.com/jobs"})"""
def __init__(self, workspace: Path):
self.workspace = workspace
async def execute(self, action: str, **kwargs) -> str:
"""Execute a browser action using browser-use CLI."""
try:
cmd = [BROWSER_USE_CMD, "--browser", "real", "--heexecute method · python · L43-L97 (55 LOC)nanobot/agent/tools/browser.py
async def execute(self, action: str, **kwargs) -> str:
"""Execute a browser action using browser-use CLI."""
try:
cmd = [BROWSER_USE_CMD, "--browser", "real", "--headed"]
if action == "open":
url = kwargs.get("url", "")
if not url:
return "Error: url is required"
cmd.extend(["open", url])
elif action == "state":
cmd.append("state")
elif action == "click":
index = kwargs.get("index", 0)
cmd.extend(["click", str(index)])
elif action == "input":
index = kwargs.get("index", 0)
text = kwargs.get("text", "")
cmd.extend(["input", str(index), text])
elif action == "screenshot":
path = str(self.workspace / "screenshot.png")
cmd.extend(["screenshot", path])
elif action == "close":
__init__ method · python · L15-L22 (8 LOC)nanobot/agent/tools/cdp_client.py
def __init__(self, host: str = "127.0.0.1", port: int = 18800):
self.host = host
self.port = port
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self.target_id: Optional[str] = None
self.session_id: Optional[str] = None
self.ref_map: dict = {} # Maps refs (e1, e2) to nodeIds
self._id = 0connect method · python · L24-L61 (38 LOC)nanobot/agent/tools/cdp_client.py
async def connect(self):
"""Connect to Chrome."""
# Get the WebSocket URL
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(f"http://{self.host}:{self.port}/json/version")
data = resp.json()
ws_url = data["webSocketDebuggerUrl"]
# Connect to WebSocket
self.ws = await ws_client.connect(ws_url)
# Get targets and attach to the first one
await self._send("Target.getTargets", {})
response = await self._recv()
targets = response.get("result", {}).get("targetInfos", [])
# Find the main page
for target in targets:
if target.get("type") == "page":
self.target_id = target.get("targetId")
break
if not self.target_id:
# Create a new page
result = await self._send_and_wait("Target.createTarget", {"url": "about:blank"})
self.target_id = result.get_send method · python · L63-L75 (13 LOC)nanobot/agent/tools/cdp_client.py
async def _send(self, method: str, params: dict = None):
"""Send a CDP command."""
if params is None:
params = {}
self._id += 1
message = {
"id": self._id,
"method": method,
"params": params
}
if self.session_id:
message["sessionId"] = self.session_id
await self.ws.send(json.dumps(message))_recv method · python · L77-L80 (4 LOC)nanobot/agent/tools/cdp_client.py
async def _recv(self):
"""Receive a CDP response."""
response = await self.ws.recv()
return json.loads(response)Same scanner, your repo: https://repobility.com — Repobility
_send_and_wait method · python · L82-L88 (7 LOC)nanobot/agent/tools/cdp_client.py
async def _send_and_wait(self, method: str, params: dict = None):
"""Send a command and wait for response."""
await self._send(method, params)
while True:
response = await self._recv()
if response.get("id") == self._id:
return responsenavigate method · python · L90-L108 (19 LOC)nanobot/agent/tools/cdp_client.py
async def navigate(self, url: str):
"""Navigate to URL."""
# Auto-add https:// if missing
if url and not url.startswith("http"):
# Add https://www. prefix for common domains
if "." not in url:
# Maybe it's a domain without TLD, try adding .com
url = f"https://www.{url}.com"
else:
url = f"https://www.{url}" if not url.startswith("www.") else f"https://{url}"
try:
result = await self._send_and_wait("Page.navigate", {"url": url})
return result
except Exception as e:
# Try to reconnect and retry
await self._reconnect()
result = await self._send_and_wait("Page.navigate", {"url": url})
return resultreload method · python · L110-L112 (3 LOC)nanobot/agent/tools/cdp_client.py
async def reload(self):
"""Reload the page."""
await self._send_and_wait("Page.reload")get_document method · python · L114-L117 (4 LOC)nanobot/agent/tools/cdp_client.py
async def get_document(self):
"""Get the document."""
result = await self._send_and_wait("DOM.getDocument")
return result.get("result", {}).get("root", {})query_selector method · python · L119-L126 (8 LOC)nanobot/agent/tools/cdp_client.py
async def query_selector(self, selector: str):
"""Query for a selector."""
doc = await self.get_document()
result = await self._send_and_wait("DOM.querySelector", {
"nodeId": doc.get("nodeId"),
"selector": selector
})
return result.get("result", {}).get("nodeId")click_element method · python · L128-L159 (32 LOC)nanobot/agent/tools/cdp_client.py
async def click_element(self, selector: str):
"""Click an element."""
node_id = await self.query_selector(selector)
if not node_id:
return {"error": "Element not found"}
# Get box model for clicking
result = await self._send_and_wait("DOM.getBoxModel", {"nodeId": node_id})
model = result.get("result", {}).get("model", {})
if model:
content = model.get("content", [])
if len(content) >= 4:
x = (content[0] + content[2]) / 2
y = (content[1] + content[5]) / 2
# Input mouse click
await self._send("Input.dispatchMouseEvent", {
"type": "mousePressed",
"x": x,
"y": y,
"button": "left",
"clickCount": 1
})
await self._send("Input.dispatchMouseEvent", {
"type": "mouseReleased",
type_text method · python · L161-L183 (23 LOC)nanobot/agent/tools/cdp_client.py
async def type_text(self, selector: str, text: str):
"""Type text into an element."""
node_id = await self.query_selector(selector)
if not node_id:
return {"error": "Element not found"}
# Focus the element
await self._send_and_wait("DOM.focus", {"nodeId": node_id})
# Type character by character
for char in text:
await self._send("Input.dispatchKeyEvent", {
"type": "keyDown",
"text": char,
"key": char
})
await self._send("Input.dispatchKeyEvent", {
"type": "keyUp",
"text": char,
"key": char
})
return {"success": True}click_by_ref method · python · L185-L270 (86 LOC)nanobot/agent/tools/cdp_client.py
async def click_by_ref(self, ref: str):
"""Click an element by ref (like OpenClaw).
Uses accessibility tree to find and click elements.
All refs are like e1, e2, e3... (no p refs).
"""
# Extract index from ref (e1 -> 0, e2 -> 1)
try:
prefix = ref[0].lower()
if prefix != 'e':
return {"error": f"Invalid ref format: {ref}. Use e1, e2..."}
idx = int(ref[1:]) - 1
except:
return {"error": f"Invalid ref format: {ref}. Use e1, e2..."}
# Find element by index and click using JavaScript
js_code = f"""
(function() {{
// Get all clickable/interactive elements
var selectors = [
'a', 'button', '[role="button"]', '[role="link"]',
'input[type="button"]', 'input[type="submit"]', 'input[type="checkbox"]', 'input[type="radio"]',
'[onclick]', '[data-clickable="true"]'
];
All rows above produced by Repobility · https://repobility.com
type_by_ref method · python · L272-L318 (47 LOC)nanobot/agent/tools/cdp_client.py
async def type_by_ref(self, ref: str, text: str):
"""Type text into an element by ref (e1, e2, etc)."""
# Extract index from ref (e1 -> 0, e2 -> 1)
try:
idx = int(ref[1:]) - 1
except:
return {"error": f"Invalid ref format: {ref}"}
# Use JavaScript to type
import json
js_code = f"""
(function() {{
var refs = document.querySelectorAll('a, button, input, [onclick], [role="button"], img, div[data-clickable="true"]');
var seen = new Set();
var count = 0;
for (var i = 0; i < refs.length; i++) {{
var el = refs[i];
var rect = el.getBoundingClientRect();
if (rect.width < 5 || rect.height < 5 || el.hidden || el.disabled) continue;
var key = el.tagName + '-' + el.innerText.substring(0, 30) + '-' + rect.left + '-' + rect.top;
if (seen.has(key)) continue;
seen.add(kpress_key method · python · L320-L346 (27 LOC)nanobot/agent/tools/cdp_client.py
async def press_key(self, key: str):
"""Press a key."""
# Map common keys
key_map = {
"Enter": "Enter",
"Return": "Enter",
"Escape": "Escape",
"Esc": "Escape",
"Tab": "Tab",
"Backspace": "Backspace",
"ArrowDown": "ArrowDown",
"ArrowUp": "ArrowUp",
"ArrowLeft": "ArrowLeft",
"ArrowRight": "ArrowRight",
}
key = key_map.get(key, key)
await self._send_and_wait("Input.dispatchKeyEvent", {
"type": "keyDown",
"key": key,
"windowsVirtualKeyCode": 0
})
await self._send_and_wait("Input.dispatchKeyEvent", {
"type": "keyUp",
"key": key,
"windowsVirtualKeyCode": 0
})get_content method · python · L348-L363 (16 LOC)nanobot/agent/tools/cdp_client.py
async def get_content(self):
"""Get page content."""
try:
result = await self._send_and_wait("Runtime.evaluate", {
"expression": "document.body.innerText",
"returnByValue": True
})
return result.get("result", {}).get("result", {}).get("value", "")
except Exception as e:
# Try to reconnect and retry
await self._reconnect()
result = await self._send_and_wait("Runtime.evaluate", {
"expression": "document.body.innerText",
"returnByValue": True
})
return result.get("result", {}).get("result", {}).get("value", "")