Function bodies 565 total
get_snapshot method · python · L365-L461 (97 LOC)nanobot/agent/tools/cdp_client.py
async def get_snapshot(self, max_nodes: int = 50, save_scroll: bool = True):
"""Get DOM snapshot with element refs.
Returns elements with refs like e1, e2, e3... based on DOM queries.
Simplified approach - uses DOM directly instead of accessibility API.
Args:
max_nodes: Maximum number of elements to return
save_scroll: If True, save and restore scroll position after snapshot
"""
# Save current scroll position
original_scroll_y = None
if save_scroll:
result = await self._send_and_wait("Runtime.evaluate", {
"expression": "window.scrollY",
"returnByValue": True
})
original_scroll_y = result.get("result", {}).get("result", {}).get("value")
# Use simple DOM-based approach - more reliable for Xiaohongshu
js_code = f"""
(function() {{
var elements = [];
// Get all clickable/interactive_get_dom_snapshot method · python · L463-L533 (71 LOC)nanobot/agent/tools/cdp_client.py
async def _get_dom_snapshot(self, max_nodes: int = 50):
"""Fallback: Get DOM snapshot using JavaScript (legacy behavior)."""
js_code = f"""
(function() {{
var elements = [];
var selectors = [
'a', 'button', 'input', 'textarea', 'select',
'[role="button"]', '[role="link"]', '[onclick]',
'[data-clickable="true"]'
];
var seen = new Set();
var count = 0;
for (var s = 0; s < selectors.length; s++) {{
try {{
var refs = document.querySelectorAll(selectors[s]);
for (var i = 0; i < refs.length && count < {max_nodes}; i++) {{
var el = refs[i];
if (!el) continue;
var rect = {{width: 0, height: 0}};
try {{ rect = el.getBoundingClientRect(); }} catch(e) {{}}
if (rect.width < 5 ||take_screenshot method · python · L535-L548 (14 LOC)nanobot/agent/tools/cdp_client.py
async def take_screenshot(self, path: str):
"""Take a screenshot."""
result = await self._send_and_wait("Page.captureScreenshot", {
"format": "png"
})
data = result.get("result", {}).get("data", "")
if data:
import base64
with open(path, "wb") as f:
f.write(base64.b64decode(data))
return {"success": True, "path": path}
return {"error": "Failed to capture screenshot"}list_tabs method · python · L550-L576 (27 LOC)nanobot/agent/tools/cdp_client.py
async def list_tabs(self):
"""List all open tabs."""
# Use HTTP endpoint for fresh data
import httpx
try:
async with httpx.AsyncClient() as client:
resp = await client.get(f"http://{self.host}:{self.port}/json")
targets = resp.json()
except Exception as e:
# Fall back to CDP
try:
result = await self._send_and_wait("Target.getTargets")
targets = result.get("result", {}).get("targetInfos", [])
except:
await self._reconnect()
result = await self._send_and_wait("Target.getTargets")
targets = result.get("result", {}).get("targetInfos", [])
tabs = []
for t in targets:
if t.get("type") == "page":
tabs.append({
"id": t.get("id"),
"title": t.get("title", ""),
"url": t.get("url", "")
_reconnect method · python · L578-L620 (43 LOC)nanobot/agent/tools/cdp_client.py
async def _reconnect(self):
"""Reconnect to Chrome and attach to first available tab."""
# Close old connection if exists
if self.ws:
try:
await self.ws.close()
except:
pass
# Reconnect
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(f"http://{self.host}:{self.port}/json/version")
data = resp.json()
ws_url = data["webSocketDebuggerUrl"]
self.ws = await ws_client.connect(ws_url)
# Get targets and attach to the first one
await self._send("Target.getTargets", {})
response = await self._recv()
targets = response.get("result", {}).get("targetInfos", [])
# Find the first page tab
for target in targets:
if target.get("type") == "page":
self.target_id = target.get("targetId")
break
if not self.target_id:
create_tab method · python · L622-L663 (42 LOC)nanobot/agent/tools/cdp_client.py
async def create_tab(self, url: str = "about:blank"):
"""Create a new tab using CDP Target.createTarget."""
# First get current targets to see what we start with
before = await self._send_and_wait("Target.getTargets")
before_tabs = [t for t in before.get("result", {}).get("targetInfos", []) if t.get("type") == "page"]
# Create new tab
result = await self._send_and_wait("Target.createTarget", {"url": url})
new_target_id = result.get("result", {}).get("targetId")
if not new_target_id:
# Check if there's an error in the result
error = result.get("error") or result.get("result", {}).get("errorText", "Unknown error")
return {"error": f"Failed to create tab: {error}", "result": result}
# Check if created
after = await self._send_and_wait("Target.getTargets")
after_tabs = [t for t in after.get("result", {}).get("targetInfos", []) if t.get("type") == "page"]
switch_tab method · python · L665-L681 (17 LOC)nanobot/agent/tools/cdp_client.py
async def switch_tab(self, tab_id: str):
"""Switch to a different tab."""
# Skip if already on this tab
if tab_id == self.target_id:
return {"success": True, "tab_id": tab_id, "skipped": True}
# Attach to target
attach_result = await self._send_and_wait("Target.attachToTarget", {
"targetId": tab_id,
"flatten": True
})
self.session_id = attach_result.get("result", {}).get("sessionId")
self.target_id = tab_id
# Enable domains
await self._send_and_wait("DOM.enable")
await self._send_and_wait("Runtime.enable")
return {"success": True, "tab_id": tab_id}Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
close_tab method · python · L683-L693 (11 LOC)nanobot/agent/tools/cdp_client.py
async def close_tab(self, tab_id: str):
"""Close a tab."""
result = await self._send_and_wait("Target.closeTarget", {"targetId": tab_id})
# Close succeeded - need to reconnect to get fresh state
if result.get("result", {}).get("success"):
# Reconnect to Chrome to get fresh target list
await self._reconnect()
return {"success": True}
return {"error": "Failed to close tab"}wait_for_selector method · python · L695-L701 (7 LOC)nanobot/agent/tools/cdp_client.py
async def wait_for_selector(self, selector: str, timeout: int = 30000):
"""Wait for a selector to appear."""
result = await self._send_and_wait("DOM.waitForSelector", {
"selector": selector,
"timeout": timeout
})
return {"success": True, "selector": selector}hover_by_ref method · python · L703-L740 (38 LOC)nanobot/agent/tools/cdp_client.py
async def hover_by_ref(self, ref: str):
"""Hover over an element by ref."""
# Find the element
elements = (await self.get_snapshot(max_nodes=100)).get("elements", [])
element = None
for el in elements:
if el.get("ref") == ref:
element = el
break
if not element:
return {"error": f"Element {ref} not found"}
node_id = element.get("nodeId")
if not node_id:
return {"error": "No nodeId for element"}
# Get element position
box_result = await self._send_and_wait("DOM.getBoxModel", {"nodeId": node_id})
model = box_result.get("result", {}).get("model")
if not model:
return {"error": "Could not get box model"}
# Calculate center
content = model.get("content", [])
if len(content) >= 8:
x = (content[0] + content[2] + content[4] + content[6]) / 4
y = (content[1] + contentscroll method · python · L742-L748 (7 LOC)nanobot/agent/tools/cdp_client.py
async def scroll(self, x: int = 0, y: int = 0):
"""Scroll the page."""
result = await self._send_and_wait("Runtime.evaluate", {
"expression": f"window.scrollBy({x}, {y})",
"returnByValue": True
})
return {"success": True, "scrolled_to": f"x={x}, y={y}"}scroll_to_selector method · python · L750-L765 (16 LOC)nanobot/agent/tools/cdp_client.py
async def scroll_to_selector(self, selector: str):
"""Scroll to a specific selector."""
result = await self._send_and_wait("Runtime.evaluate", {
"expression": f"""
(function() {{
const el = document.querySelector('{selector}');
if (el) {{
el.scrollIntoView({{behavior: 'smooth', block: 'center'}});
return 'scrolled';
}}
return 'not found';
}})()
""",
"returnByValue": True
})
return {"success": True, "result": result.get("result", {}).get("result", {}).get("value", "")}resize_viewport method · python · L767-L775 (9 LOC)nanobot/agent/tools/cdp_client.py
async def resize_viewport(self, width: int, height: int):
"""Resize the viewport."""
result = await self._send_and_wait("Emulation.setDeviceMetricsOverride", {
"width": width,
"height": height,
"deviceScaleFactor": 1,
"mobile": False
})
return {"success": True, "size": f"{width}x{height}"}evaluate method · python · L777-L788 (12 LOC)nanobot/agent/tools/cdp_client.py
async def evaluate(self, expression: str):
"""Execute JavaScript and return result."""
result = await self._send_and_wait("Runtime.evaluate", {
"expression": expression,
"returnByValue": True,
"awaitPromise": True
})
eval_result = result.get("result", {})
if eval_result.get("type") == "object":
# Return serialized
return {"success": True, "result": str(eval_result.get("value", ""))}
return {"success": True, "result": eval_result.get("value", "")}get_cookies method · python · L790-L794 (5 LOC)nanobot/agent/tools/cdp_client.py
async def get_cookies(self):
"""Get all cookies."""
result = await self._send_and_wait("Network.getAllCookies", {})
cookies = result.get("result", {}).get("cookies", [])
return {"success": True, "cookies": cookies}Repobility (the analyzer behind this table) · https://repobility.com
set_cookie method · python · L796-L804 (9 LOC)nanobot/agent/tools/cdp_client.py
async def set_cookie(self, name: str, value: str, domain: str = "", url: str = ""):
"""Set a cookie."""
result = await self._send_and_wait("Network.setCookie", {
"name": name,
"value": value,
"domain": domain,
"url": url
})
return {"success": result.get("result", {}).get("success", False), "name": name}delete_cookies method · python · L806-L812 (7 LOC)nanobot/agent/tools/cdp_client.py
async def delete_cookies(self, name: str, domain: str = ""):
"""Delete a cookie."""
if domain:
await self._send_and_wait("Network.deleteCookies", {"name": name, "domain": domain})
else:
await self._send_and_wait("Network.deleteCookies", {"name": name})
return {"success": True, "deleted": name}get_local_storage method · python · L814-L825 (12 LOC)nanobot/agent/tools/cdp_client.py
async def get_local_storage(self):
"""Get localStorage items."""
result = await self._send_and_wait("Runtime.evaluate", {
"expression": "JSON.stringify(localStorage)",
"returnByValue": True
})
value = result.get("result", {}).get("result", {}).get("value", "{}")
try:
items = json.loads(value)
except:
items = {}
return {"success": True, "storage": items}get_session_storage method · python · L827-L838 (12 LOC)nanobot/agent/tools/cdp_client.py
async def get_session_storage(self):
"""Get sessionStorage items."""
result = await self._send_and_wait("Runtime.evaluate", {
"expression": "JSON.stringify(sessionStorage)",
"returnByValue": True
})
value = result.get("result", {}).get("result", {}).get("value", "{}")
try:
items = json.loads(value)
except:
items = {}
return {"success": True, "storage": items}wait_for_url method · python · L840-L850 (11 LOC)nanobot/agent/tools/cdp_client.py
async def wait_for_url(self, url_pattern: str, timeout: int = 30000):
"""Wait for URL to match pattern."""
import time
start = time.time()
while (time.time() - start) * 1000 < timeout:
result = await self._send_and_wait("Page.getNavigationHistory", {})
current_url = result.get("result", {}).get("entries", [{}])[-1].get("url", "")
if url_pattern in current_url or (url_pattern.startswith("**") and current_url.startswith(url_pattern[2:])):
return {"success": True, "url": current_url}
await asyncio.sleep(0.5)
return {"error": "Timeout waiting for URL"}wait_for_load method · python · L852-L861 (10 LOC)nanobot/agent/tools/cdp_client.py
async def wait_for_load(self, timeout: int = 30000):
"""Wait for page to load."""
import time
start = time.time()
while (time.time() - start) * 1000 < timeout:
result = await self._send_and_wait("Page.getLoadEventFired", {})
if result.get("result"):
return {"success": True}
await asyncio.sleep(0.5)
return {"error": "Timeout waiting for load"}wait_for_selector method · python · L863-L872 (10 LOC)nanobot/agent/tools/cdp_client.py
async def wait_for_selector(self, selector: str, timeout: int = 30000):
"""Wait for selector to appear."""
import time
start = time.time()
while (time.time() - start) * 1000 < timeout:
node_id = await self.query_selector(selector)
if node_id:
return {"success": True, "selector": selector}
await asyncio.sleep(0.5)
return {"error": f"Timeout waiting for selector: {selector}"}wait method · python · L874-L882 (9 LOC)nanobot/agent/tools/cdp_client.py
async def wait(self, url: str = "", selector: str = "", load: bool = False, timeout: int = 30000):
"""Wait for conditions: url, selector, or load."""
if url:
return await self.wait_for_url(url, timeout)
if selector:
return await self.wait_for_selector(selector, timeout)
if load:
return await self.wait_for_load(timeout)
return {"error": "No wait condition specified"}Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
get_console_messages method · python · L884-L895 (12 LOC)nanobot/agent/tools/cdp_client.py
async def get_console_messages(self):
"""Get console messages."""
# Enable console domain first
await self._send("Log.enable", {})
result = await self._send_and_wait("Log.getEntries", {})
entries = result.get("result", {}).get("entries", [])
messages = []
for entry in entries:
msg_type = entry.get("type", "log")
text = entry.get("text", "")
messages.append(f"[{msg_type}] {text}")
return {"success": True, "messages": messages[:50]}get_errors method · python · L897-L907 (11 LOC)nanobot/agent/tools/cdp_client.py
async def get_errors(self):
"""Get page errors."""
# Enable console domain
await self._send("Log.enable", {})
result = await self._send_and_wait("Log.getEntries", {})
entries = result.get("result", {}).get("entries", [])
errors = []
for entry in entries:
if entry.get("level") == "error":
errors.append(entry.get("text", "Unknown error"))
return {"success": True, "errors": errors}download_file method · python · L909-L919 (11 LOC)nanobot/agent/tools/cdp_client.py
async def download_file(self, url: str, path: str):
"""Download a file from URL."""
import httpx
async with httpx.AsyncClient() as client:
resp = await client.get(url)
if resp.status_code == 200:
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, "wb") as f:
f.write(resp.content)
return {"success": True, "path": path}
return {"error": f"Download failed: {resp.status_code}"}upload_file method · python · L921-L947 (27 LOC)nanobot/agent/tools/cdp_client.py
async def upload_file(self, selector: str, file_path: str):
"""Upload a file to an input element."""
import os
if not os.path.exists(file_path):
return {"error": f"File not found: {file_path}"}
# Use file chooser CDP method
node_id = await self.query_selector(selector)
if not node_id:
return {"error": f"Element not found: {selector}"}
# Get the node's object ID for file upload
result = await self._send_and_wait("DOM.resolveNode", {"nodeId": node_id})
object_id = result.get("result", {}).get("object", {}).get("objectId")
if not object_id:
return {"error": "Could not get object ID"}
# Set file for upload
upload_result = await self._send_and_wait("DOM.setFileInputFiles", {
"objectId": object_id,
"files": [{"name": os.path.basename(file_path), "path": file_path}]
})
if upload_result.get("result", {}).get("successtart_trace method · python · L949-L953 (5 LOC)nanobot/agent/tools/cdp_client.py
async def start_trace(self, path: str):
"""Start tracing."""
Path(path).parent.mkdir(parents=True, exist_ok=True)
await self._send_and_wait("Tracing.start", {})
return {"success": True, "path": path}stop_trace method · python · L955-L973 (19 LOC)nanobot/agent/tools/cdp_client.py
async def stop_trace(self, path: str):
"""Stop tracing and save to file."""
# Stop tracing and collect
await self._send("Tracing.end", {})
# Wait for data
await asyncio.sleep(2)
# Get trace data
result = await self._send_and_wait("Tracing.getTrace", {})
trace_data = result.get("result", {}).get("value", "")
if trace_data:
import base64
with open(path, "w") as f:
f.write(base64.b64decode(trace_data).decode("utf-8"))
return {"success": True, "path": path}
return {"error": "No trace data"}close method · python · L975-L978 (4 LOC)nanobot/agent/tools/cdp_client.py
async def close(self):
"""Close the connection."""
if self.ws:
await self.ws.close()CronTool class · python · L10-L168 (159 LOC)nanobot/agent/tools/cron.py
class CronTool(Tool):
"""Tool to schedule reminders and recurring tasks."""
def __init__(self, cron_service: CronService):
self._cron = cron_service
self._channel = ""
self._chat_id = ""
def set_context(self, channel: str, chat_id: str) -> None:
"""Set the current session context for delivery."""
self._channel = channel
self._chat_id = chat_id
@property
def name(self) -> str:
return "cron"
@property
def description(self) -> str:
return "Schedule reminders and recurring tasks. Actions: add, list, remove."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["add", "list", "remove"],
"description": "Action to perform"
},
"message": {
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
__init__ method · python · L13-L16 (4 LOC)nanobot/agent/tools/cron.py
def __init__(self, cron_service: CronService):
self._cron = cron_service
self._channel = ""
self._chat_id = ""set_context method · python · L18-L21 (4 LOC)nanobot/agent/tools/cron.py
def set_context(self, channel: str, chat_id: str) -> None:
"""Set the current session context for delivery."""
self._channel = channel
self._chat_id = chat_idexecute method · python · L81-L101 (21 LOC)nanobot/agent/tools/cron.py
async def execute(
self,
action: str,
message: str = "",
every_seconds: int | None = None,
cron_expr: str | None = None,
tz: str | None = None,
at: str | None = None,
job_id: str | None = None,
session_target: str = "current",
thinking: str | None = None,
model: str | None = None,
**kwargs: Any
) -> str:
if action == "add":
return self._add_job(message, every_seconds, cron_expr, tz, at, session_target, thinking, model)
elif action == "list":
return self._list_jobs()
elif action == "remove":
return self._remove_job(job_id)
return f"Unknown action: {action}"_add_job method · python · L103-L154 (52 LOC)nanobot/agent/tools/cron.py
def _add_job(
self,
message: str,
every_seconds: int | None,
cron_expr: str | None,
tz: str | None,
at: str | None,
session_target: str = "current",
thinking: str | None = None,
model: str | None = None,
) -> str:
if not message:
return "Error: message is required for add"
if not self._channel or not self._chat_id:
return "Error: no session context (channel/chat_id)"
if tz and not cron_expr:
return "Error: tz can only be used with cron_expr"
if tz:
from zoneinfo import ZoneInfo
try:
ZoneInfo(tz)
except (KeyError, Exception):
return f"Error: unknown timezone '{tz}'"
# Build schedule
delete_after = False
if every_seconds:
schedule = CronSchedule(kind="every", every_ms=every_seconds * 1000)
elif cron_expr:
schedule = Cro_list_jobs method · python · L156-L161 (6 LOC)nanobot/agent/tools/cron.py
def _list_jobs(self) -> str:
jobs = self._cron.list_jobs()
if not jobs:
return "No scheduled jobs."
lines = [f"- {j.name} (id: {j.id}, {j.schedule.kind})" for j in jobs]
return "Scheduled jobs:\n" + "\n".join(lines)_remove_job method · python · L163-L168 (6 LOC)nanobot/agent/tools/cron.py
def _remove_job(self, job_id: str | None) -> str:
if not job_id:
return "Error: job_id is required for remove"
if self._cron.remove_job(job_id):
return f"Removed job {job_id}"
return f"Job {job_id} not found"_resolve_path function · python · L9-L17 (9 LOC)nanobot/agent/tools/filesystem.py
def _resolve_path(path: str, workspace: Path | None = None, allowed_dir: Path | None = None) -> Path:
"""Resolve path against workspace (if relative) and enforce directory restriction."""
p = Path(path).expanduser()
if not p.is_absolute() and workspace:
p = workspace / p
resolved = p.resolve()
if allowed_dir and not str(resolved).startswith(str(allowed_dir.resolve())):
raise PermissionError(f"Path {path} is outside allowed directory {allowed_dir}")
return resolvedReadFileTool class · python · L20-L61 (42 LOC)nanobot/agent/tools/filesystem.py
class ReadFileTool(Tool):
"""Tool to read file contents."""
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
def name(self) -> str:
return "read_file"
@property
def description(self) -> str:
return "Read the contents of a file at the given path."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The file path to read"
}
},
"required": ["path"]
}
async def execute(self, path: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
__init__ method · python · L23-L25 (3 LOC)nanobot/agent/tools/filesystem.py
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_direxecute method · python · L48-L61 (14 LOC)nanobot/agent/tools/filesystem.py
async def execute(self, path: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {path}"
if not file_path.is_file():
return f"Error: Not a file: {path}"
content = file_path.read_text(encoding="utf-8")
return content
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error reading file: {str(e)}"WriteFileTool class · python · L64-L105 (42 LOC)nanobot/agent/tools/filesystem.py
class WriteFileTool(Tool):
"""Tool to write content to a file."""
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
def name(self) -> str:
return "write_file"
@property
def description(self) -> str:
return "Write content to a file at the given path. Creates parent directories if needed."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The file path to write to"
},
"content": {
"type": "string",
"description": "The content to write"
}
},
"required": ["path", "content"]
}
async def execute(self, path: str, cont__init__ method · python · L67-L69 (3 LOC)nanobot/agent/tools/filesystem.py
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_direxecute method · python · L96-L105 (10 LOC)nanobot/agent/tools/filesystem.py
async def execute(self, path: str, content: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_text(content, encoding="utf-8")
return f"Successfully wrote {len(content)} bytes to {file_path}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error writing file: {str(e)}"EditFileTool class · python · L108-L167 (60 LOC)nanobot/agent/tools/filesystem.py
class EditFileTool(Tool):
"""Tool to edit a file by replacing text."""
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
def name(self) -> str:
return "edit_file"
@property
def description(self) -> str:
return "Edit a file by replacing old_text with new_text. The old_text must exist exactly in the file."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The file path to edit"
},
"old_text": {
"type": "string",
"description": "The exact text to find and replace"
},
"new_text": {
"type": "string",
__init__ method · python · L111-L113 (3 LOC)nanobot/agent/tools/filesystem.py
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_direxecute method · python · L144-L167 (24 LOC)nanobot/agent/tools/filesystem.py
async def execute(self, path: str, old_text: str, new_text: str, **kwargs: Any) -> str:
try:
file_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not file_path.exists():
return f"Error: File not found: {path}"
content = file_path.read_text(encoding="utf-8")
if old_text not in content:
return f"Error: old_text not found in file. Make sure it matches exactly."
# Count occurrences
count = content.count(old_text)
if count > 1:
return f"Warning: old_text appears {count} times. Please provide more context to make it unique."
new_content = content.replace(old_text, new_text, 1)
file_path.write_text(new_content, encoding="utf-8")
return f"Successfully edited {file_path}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"ErrRepobility (the analyzer behind this table) · https://repobility.com
ListDirTool class · python · L170-L218 (49 LOC)nanobot/agent/tools/filesystem.py
class ListDirTool(Tool):
"""Tool to list directory contents."""
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_dir
@property
def name(self) -> str:
return "list_dir"
@property
def description(self) -> str:
return "List the contents of a directory."
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "The directory path to list"
}
},
"required": ["path"]
}
async def execute(self, path: str, **kwargs: Any) -> str:
try:
dir_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not dir_path.exists():
return f"Error: Directory not found: {pa__init__ method · python · L173-L175 (3 LOC)nanobot/agent/tools/filesystem.py
def __init__(self, workspace: Path | None = None, allowed_dir: Path | None = None):
self._workspace = workspace
self._allowed_dir = allowed_direxecute method · python · L198-L218 (21 LOC)nanobot/agent/tools/filesystem.py
async def execute(self, path: str, **kwargs: Any) -> str:
try:
dir_path = _resolve_path(path, self._workspace, self._allowed_dir)
if not dir_path.exists():
return f"Error: Directory not found: {path}"
if not dir_path.is_dir():
return f"Error: Not a directory: {path}"
items = []
for item in sorted(dir_path.iterdir()):
prefix = "📁 " if item.is_dir() else "📄 "
items.append(f"{prefix}{item.name}")
if not items:
return f"Directory {path} is empty"
return "\n".join(items)
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error listing directory: {str(e)}"