Function bodies 522 total
ingest_path function · python · L94-L145 (52 LOC)documents/ingestion.py
def ingest_path(path: Path, document_store: "DocumentStore") -> str:
"""Ingest a file or directory into the document store.
Returns a summary string of what was ingested.
"""
files = []
if path.is_file():
files = [path]
elif path.is_dir():
for ext in SUPPORTED_EXTENSIONS:
files.extend(path.glob(f"**/*{ext}"))
# Security: reject symlinks and paths that escape the target directory
resolved_root = path.resolve()
safe_files = []
for f in files:
if f.is_symlink():
continue
if not f.resolve().is_relative_to(resolved_root):
continue
safe_files.append(f)
files = safe_files
if not files:
return f"No supported files found at {path}"
total_chunks = 0
skipped = 0
for file in files:
try:
text = load_text_file(file)
except ValueError:
skipped += 1
continue
DocumentStore.__init__ method · python · L9-L18 (10 LOC)documents/store.py
def __init__(self, persist_dir: Path):
self.client = chromadb.Client(Settings(
persist_directory=str(persist_dir),
is_persistent=True,
anonymized_telemetry=False,
))
self.collection = self.client.get_or_create_collection(
name="chief_of_staff_docs",
metadata={"hnsw:space": "cosine"},
)DocumentStore.add_documents method · python · L20-L26 (7 LOC)documents/store.py
def add_documents(
self,
texts: list[str],
metadatas: list[dict],
ids: list[str],
) -> None:
self.collection.add(documents=texts, metadatas=metadatas, ids=ids)DocumentStore.search method · python · L28-L39 (12 LOC)documents/store.py
def search(self, query: str, top_k: int = 5) -> list[dict]:
if self.collection.count() == 0:
return []
results = self.collection.query(query_texts=[query], n_results=top_k)
output = []
for i in range(len(results["documents"][0])):
output.append({
"text": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i] if results.get("distances") else None,
})
return output_format_date function · python · L15-L35 (21 LOC)formatter/brief.py
def _format_date(date_str: str) -> str:
"""Convert ISO date string to human-readable format.
If *date_str* is already human-readable (not ISO), it is returned as-is.
Args:
date_str: Date in YYYY-MM-DD format, or already human-readable.
Returns:
Human-readable date like 'Wednesday, February 25, 2026'.
"""
try:
dt = datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
# Already human-readable or non-ISO — pass through unchanged
return date_str
try:
return dt.strftime("%A, %B %-d, %Y")
except ValueError:
# Windows does not support %-d; fall back to %d and strip leading zero
return dt.strftime("%A, %B %d, %Y").replace(" 0", " ")_build_calendar_table function · python · L38-L57 (20 LOC)formatter/brief.py
def _build_calendar_table(entries: Sequence[CalendarEntry]) -> Table:
"""Build a rich Table for calendar entries.
Args:
entries: Sequence of CalendarEntry dicts.
Returns:
A rich Table with Time, Event, and Status columns.
"""
table = Table(box=INNER_BOX, expand=True, show_edge=False)
table.add_column("Time", style="cyan", no_wrap=True)
table.add_column("Event", style="white")
table.add_column("Status", style="dim")
for entry in entries:
table.add_row(
entry.get("time", ""),
entry.get("event", ""),
entry.get("status", ""),
)
return table_build_action_items function · python · L60-L77 (18 LOC)formatter/brief.py
def _build_action_items(items: Sequence[ActionItem]) -> Text:
"""Build formatted text for action items with priority icons.
Args:
items: Sequence of ActionItem dicts.
Returns:
A rich Text object with one line per action item.
"""
text = Text()
for i, item in enumerate(items):
priority = item.get("priority", "medium")
icon = PRIORITY_ICONS.get(priority, PRIORITY_ICONS["medium"])
line = f" {icon} {item.get('text', '')}"
if i < len(items) - 1:
line += "\n"
text.append(line)
return textAll rows above produced by Repobility · https://repobility.com
_build_conflicts function · python · L80-L104 (25 LOC)formatter/brief.py
def _build_conflicts(conflicts: Sequence[Conflict]) -> Text:
"""Build formatted text for scheduling conflicts.
Args:
conflicts: Sequence of Conflict dicts.
Returns:
A rich Text object with one line per conflict.
"""
text = Text()
for i, conflict in enumerate(conflicts):
time = conflict.get("time", "")
a = conflict.get("a", "")
b = conflict.get("b", "")
if time and b:
line = f" {time}: {a} \u2190\u2192 {b}"
elif time:
line = f" {time}: {a}"
else:
# Plain string was coerced — just display it
line = f" {a}"
if i < len(conflicts) - 1:
line += "\n"
text.append(line)
return text_build_email_table function · python · L107-L126 (20 LOC)formatter/brief.py
def _build_email_table(highlights: Sequence[EmailHighlight]) -> Table:
"""Build a rich Table for email highlights.
Args:
highlights: Sequence of EmailHighlight dicts.
Returns:
A rich Table with Sender, Subject, and Tag columns.
"""
table = Table(box=INNER_BOX, expand=True, show_edge=False)
table.add_column("Sender", style="cyan", no_wrap=True)
table.add_column("Subject", style="white")
table.add_column("Tag", style="dim")
for highlight in highlights:
table.add_row(
highlight.get("sender", ""),
highlight.get("subject", ""),
highlight.get("tag", ""),
)
return table_build_delegation_table function · python · L129-L143 (15 LOC)formatter/brief.py
def _build_delegation_table(items: Sequence[dict]) -> Table:
"""Build a rich Table for structured delegation data."""
table = Table(box=INNER_BOX, expand=True, show_edge=False)
table.add_column("Task", style="white")
table.add_column("Assigned To", style="cyan", no_wrap=True)
table.add_column("Priority", style="dim")
table.add_column("Status", style="dim")
for item in items:
table.add_row(
item.get("task", ""),
item.get("delegated_to", ""),
item.get("priority", ""),
item.get("status", ""),
)
return table_build_decision_table function · python · L146-L158 (13 LOC)formatter/brief.py
def _build_decision_table(items: Sequence[dict]) -> Table:
"""Build a rich Table for structured decision data."""
table = Table(box=INNER_BOX, expand=True, show_edge=False)
table.add_column("Decision", style="white")
table.add_column("Status", style="dim")
table.add_column("Owner", style="cyan", no_wrap=True)
for item in items:
table.add_row(
item.get("title", ""),
item.get("status", ""),
item.get("owner", ""),
)
return table_build_okr_table function · python · L161-L175 (15 LOC)formatter/brief.py
def _build_okr_table(items: Sequence[dict]) -> Table:
"""Build a rich Table for OKR highlights."""
table = Table(box=INNER_BOX, expand=True, show_edge=False)
table.add_column("Initiative", style="white")
table.add_column("Team", style="cyan", no_wrap=True)
table.add_column("Status", style="dim")
table.add_column("Progress", style="dim")
for item in items:
table.add_row(
item.get("initiative", item.get("name", "")),
item.get("team", ""),
item.get("status", ""),
item.get("progress", ""),
)
return table_build_personal function · python · L178-L193 (16 LOC)formatter/brief.py
def _build_personal(items: Sequence[str]) -> Text:
"""Build a bulleted list for personal items.
Args:
items: Sequence of string items.
Returns:
A rich Text object with bullet-prefixed lines.
"""
text = Text()
for i, item in enumerate(items):
line = f" \u00b7 {item}"
if i < len(items) - 1:
line += "\n"
text.append(line)
return text_coerce_item function · python · L196-L203 (8 LOC)formatter/brief.py
def _coerce_item(item, wrap_key: str, defaults: dict | None = None) -> dict:
"""Coerce a plain string into a dict, or return the dict unchanged."""
if isinstance(item, str):
d = {wrap_key: item}
if defaults:
d.update(defaults)
return d
return itemrender_daily function · python · L206-L348 (143 LOC)formatter/brief.py
def render_daily(
date: str,
calendar: Optional[Sequence[CalendarEntry]] = None,
action_items: Optional[Sequence[ActionItem]] = None,
conflicts: Optional[Sequence[Conflict]] = None,
email_highlights: Optional[Sequence[EmailHighlight]] = None,
personal: Optional[Sequence[str]] = None,
delegations: Optional[str] = None,
decisions: Optional[str] = None,
delegation_items: Optional[Sequence[dict]] = None,
decision_items: Optional[Sequence[dict]] = None,
okr_highlights: Optional[Sequence[dict]] = None,
mode: str = "terminal",
width: int = 80,
) -> str:
"""Render a complete daily briefing dashboard.
Sections with no data are omitted entirely. Returns empty string
if all sections are empty.
Args:
date: ISO format date string (YYYY-MM-DD).
calendar: Calendar entries for the day.
action_items: Prioritized action items.
conflicts: Scheduling conflicts.
email_highlights: Notable emails.
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
render_kv function · python · L15-L39 (25 LOC)formatter/cards.py
def render_kv(fields: dict[str, str], mode: str = "terminal", width: int = 80) -> str:
"""Render key-value pairs as a borderless two-column table.
Args:
fields: Dictionary of key-value pairs to display.
mode: "terminal" for ANSI colour output, "plain" for no markup.
width: Console width in characters.
Returns:
Rendered string, or empty string if fields is empty.
"""
if not fields:
return ""
console = get_console(mode=mode, width=width)
table = Table(show_header=False, show_edge=False, box=None, padding=(0, 1))
table.add_column("Key", style="bold")
table.add_column("Value")
for key, value in fields.items():
table.add_row(key, value)
console.print(table)
return render_to_string(console)render function · python · L42-L102 (61 LOC)formatter/cards.py
def render(
title: str,
fields: dict[str, str],
mode: str = "terminal",
status: Optional[str] = None,
body: Optional[str] = None,
width: int = 80,
) -> str:
"""Render a status card with optional status badge and body text.
Args:
title: Card title displayed at the top of the panel.
fields: Key-value pairs to display inside the card.
mode: "terminal" for ANSI colour output, "plain" for no markup.
status: Optional status string (e.g. "ok", "fail") for a coloured badge.
body: Optional body text displayed below the key-value fields.
width: Console width in characters.
Returns:
Rendered card string.
"""
console = get_console(mode=mode, width=width)
# Build the inner content: key-value table
inner_parts: list = []
if fields:
table = Table(show_header=False, show_edge=False, box=None, padding=(0, 1))
table.add_column("Key", style="bold")
table.add_column("get_console function · python · L7-L28 (22 LOC)formatter/console.py
def get_console(mode: str = "terminal", width: int = 80) -> Console:
"""Create a Console configured for the given render mode.
Args:
mode: "terminal" for ANSI color output, "plain" for no-ANSI text.
width: Console width in characters.
Returns:
A rich Console instance.
"""
if mode == "plain":
return Console(
file=StringIO(),
force_terminal=False,
no_color=True,
width=width,
)
return Console(
file=StringIO(),
force_terminal=True,
width=width,
)render function · python · L14-L57 (44 LOC)formatter/dashboard.py
def render(
title: str,
panels: Sequence[dict],
mode: str = "terminal",
columns: int = 1,
width: int = 80,
) -> str:
"""Render a multi-panel dashboard as a string.
Args:
title: Dashboard header title.
panels: Sequence of dicts, each with ``"title"`` and ``"content"`` keys.
mode: ``"terminal"`` for ANSI colour output, ``"plain"`` for no ANSI.
columns: Number of columns in the grid layout (reserved for future use).
width: Fixed output width.
Returns:
Rendered dashboard string, or empty string if *panels* is empty.
"""
if not panels:
return ""
console = get_console(mode=mode, width=width)
# Header panel with title centered, using BOX style
header = Panel(
Align.center(title),
box=BOX,
style=TITLE_STYLE,
width=width,
)
console.print(header)
# Render each section panel
for panel_spec in panels:
section = Panel(
p_format_time function · python · L14-L24 (11 LOC)formatter/data_helpers.py
def _format_time(iso_str: str) -> str:
"""Convert ISO datetime string to human-readable time like '8:30 AM'."""
try:
dt = datetime.fromisoformat(iso_str)
try:
return dt.strftime("%-I:%M %p")
except ValueError:
# Windows does not support %-I; fall back to %I and strip leading zero
return dt.strftime("%I:%M %p").lstrip("0")
except (ValueError, TypeError):
return iso_strcalendar_events_to_entries function · python · L27-L44 (18 LOC)formatter/data_helpers.py
def calendar_events_to_entries(events: Sequence[dict]) -> list[CalendarEntry]:
"""Convert raw calendar event dicts to CalendarEntry format.
Args:
events: Raw event dicts from calendar tools (keys: title, start, end, status, location).
Returns:
List of CalendarEntry dicts with time, event, status keys.
"""
entries: list[CalendarEntry] = []
for ev in events:
entry: CalendarEntry = {
"time": _format_time(ev.get("start", "")),
"event": ev.get("title", ev.get("summary", "")),
"status": ev.get("status", ""),
}
entries.append(entry)
return entriesdelegations_to_table_data function · python · L47-L65 (19 LOC)formatter/data_helpers.py
def delegations_to_table_data(
delegations: Sequence[dict],
) -> tuple[list[str], list[tuple[str, ...]]]:
"""Convert raw delegation dicts to table columns and rows.
Returns:
(columns, rows) tuple for formatter.tables.render().
"""
columns = ["Task", "Assigned To", "Priority", "Status", "Due"]
rows = []
for d in delegations:
rows.append((
d.get("task", ""),
d.get("delegated_to", ""),
d.get("priority", ""),
d.get("status", ""),
d.get("due_date", ""),
))
return columns, rowsdecisions_to_table_data function · python · L68-L85 (18 LOC)formatter/data_helpers.py
def decisions_to_table_data(
decisions: Sequence[dict],
) -> tuple[list[str], list[tuple[str, ...]]]:
"""Convert raw decision dicts to table columns and rows.
Returns:
(columns, rows) tuple for formatter.tables.render().
"""
columns = ["Decision", "Status", "Owner", "Follow-up"]
rows = []
for d in decisions:
rows.append((
d.get("title", ""),
d.get("status", ""),
d.get("owner", ""),
d.get("follow_up_date", ""),
))
return columns, rowsWant this analysis on your repo? https://repobility.com/scan/
delegations_to_summary function · python · L88-L103 (16 LOC)formatter/data_helpers.py
def delegations_to_summary(delegations: Sequence[dict]) -> str:
"""Produce a one-line summary of delegations for the daily brief.
Returns:
Summary string like '2 active, 1 high priority' or empty string.
"""
if not delegations:
return ""
active = [d for d in delegations if d.get("status") == "active"]
high = [d for d in active if d.get("priority") in ("high", "critical")]
parts = [f"{len(active)} active"]
if high:
parts.append(f"{len(high)} high priority")
return ", ".join(parts)decisions_to_summary function · python · L106-L117 (12 LOC)formatter/data_helpers.py
def decisions_to_summary(decisions: Sequence[dict]) -> str:
"""Produce a one-line summary of decisions for the daily brief.
Returns:
Summary string like '2 pending' or empty string.
"""
if not decisions:
return ""
pending = [d for d in decisions if d.get("status") == "pending_execution"]
parts = [f"{len(pending)} pending"]
return ", ".join(parts)render function · python · L11-L48 (38 LOC)formatter/tables.py
def render(
columns: Sequence[str],
rows: Sequence[Sequence[str]],
mode: str = "terminal",
title: Optional[str] = None,
width: int = 80,
) -> str:
"""Render a table as a string.
Args:
columns: Column header labels.
rows: Row data. Rows shorter than *columns* are padded with empty
strings so that Rich never raises on a column-count mismatch.
mode: "terminal" for ANSI colour output, "plain" for no escape codes.
title: Optional table title shown above the header row.
width: Fixed output width in columns.
Returns:
The rendered table string, or an empty string when *rows* is empty.
"""
if not rows:
return ""
num_cols = len(columns)
table = Table(title=title, box=BOX, header_style=HEADER_STYLE, width=width)
for col in columns:
table.add_column(col)
for row in rows:
# Pad short rows with empty strings so Rich does not crash.
padded = list(raudit_log_hook function · python · L20-L49 (30 LOC)hooks/builtin.py
def audit_log_hook(context: dict) -> dict[str, Any]:
"""Append a JSON line to the audit log for every tool call.
Works as both a before_tool_call and after_tool_call hook.
"""
log_path = Path(AUDIT_LOG_PATH)
log_path.parent.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": context.get("timestamp", ""),
"tool_name": context.get("tool_name", ""),
"agent_name": context.get("agent_name", ""),
"event": "after_tool_call" if "result" in context else "before_tool_call",
}
# Include args only for before_tool_call (to avoid double-logging)
if "result" not in context:
entry["tool_args"] = context.get("tool_args", {})
else:
# Truncate large results to keep the log manageable
result = context.get("result")
result_str = str(result)
if len(result_str) > 500:
result_str = result_str[:500] + "...[truncated]"
entry["result_preview"] = result_str
with open(lotiming_after_hook function · python · L58-L71 (14 LOC)hooks/builtin.py
def timing_after_hook(context: dict) -> dict[str, Any] | None:
"""Calculate and return elapsed time for a tool call (after_tool_call).
Expects the same timestamp key that was passed to timing_before_hook.
"""
key = context.get("timestamp", "")
start = _timing_store.pop(key, None)
if start is None:
return None
elapsed = time.monotonic() - start
return {
"tool_name": context.get("tool_name", ""),
"elapsed_seconds": round(elapsed, 4),
}HookRegistry.register_hook method · python · L40-L70 (31 LOC)hooks/registry.py
def register_hook(
self,
event_type: str,
callback: Callable[[dict], Any],
*,
name: str = "",
priority: int = 100,
enabled: bool = True,
) -> None:
"""Register a hook callback for an event type.
Args:
event_type: One of EVENT_TYPES.
callback: Callable receiving a context dict. May be sync or async.
name: Optional human-readable name (used in logs).
priority: Lower numbers run first. Default 100.
enabled: If False, the hook is registered but will not fire.
"""
if event_type not in EVENT_TYPES:
raise ValueError(
f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
)
entry = {
"name": name or getattr(callback, "__name__", "anonymous"),
"callback": callback,
"priority": priority,
"enabled": enabled,
}
sHookRegistry.fire_hooks method · python · L72-L92 (21 LOC)hooks/registry.py
def fire_hooks(self, event_type: str, context: dict) -> list[Any]:
"""Fire all enabled hooks for *event_type*, returning their results.
Each hook receives a **copy** of *context* so mutations don't leak between hooks.
If a hook raises, the exception is logged and ``None`` is appended to results.
"""
if event_type not in EVENT_TYPES:
raise ValueError(
f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
)
results: list[Any] = []
for entry in self._hooks[event_type]:
if not entry["enabled"]:
continue
try:
result = entry["callback"](dict(context))
results.append(result)
except Exception:
logger.exception("Hook '%s' failed for event '%s'", entry["name"], event_type)
results.append(None)
return resultsHookRegistry.get_hooks method · python · L94-L100 (7 LOC)hooks/registry.py
def get_hooks(self, event_type: str) -> list[dict[str, Any]]:
"""Return the list of hook entries for *event_type* (mainly for introspection/tests)."""
if event_type not in EVENT_TYPES:
raise ValueError(
f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
)
return list(self._hooks[event_type])If a scraper extracted this row, it came from Repobility (https://repobility.com)
HookRegistry.load_configs method · python · L111-L155 (45 LOC)hooks/registry.py
def load_configs(self, config_dir: str | Path) -> int:
"""Load hook YAML configs from *config_dir*. Returns count of hooks loaded.
Each YAML file should contain a list of hook definitions:
- event_type: before_tool_call
name: my_hook
handler: hooks.builtin.audit_log_hook
priority: 50
enabled: true
"""
config_dir = Path(config_dir)
if not config_dir.is_dir():
logger.warning("Hook config directory does not exist: %s", config_dir)
return 0
loaded = 0
for yaml_file in sorted(config_dir.glob("*.yaml")):
try:
with open(yaml_file) as f:
entries = yaml.safe_load(f)
if not isinstance(entries, list):
logger.warning("Hook config %s: expected a list, got %s", yaml_file, type(entries).__name__)
continue
for entry in entries:
_import_handler function · python · L162-L173 (12 LOC)hooks/registry.py
def _import_handler(dotted_path: str) -> Callable | None:
"""Import a callable from a dotted module path like 'hooks.builtin.audit_log_hook'."""
parts = dotted_path.rsplit(".", 1)
if len(parts) != 2:
return None
module_path, attr_name = parts
try:
import importlib
mod = importlib.import_module(module_path)
return getattr(mod, attr_name, None)
except (ImportError, AttributeError):
return Nonebuild_tool_context function · python · L176-L191 (16 LOC)hooks/registry.py
def build_tool_context(
tool_name: str,
tool_args: dict,
agent_name: str = "",
result: Any = None,
) -> dict:
"""Build a standard context dict for tool-related hook events."""
ctx: dict[str, Any] = {
"tool_name": tool_name,
"tool_args": tool_args,
"agent_name": agent_name,
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if result is not None:
ctx["result"] = result
return ctxextract_transformed_args function · python · L194-L206 (13 LOC)hooks/registry.py
def extract_transformed_args(hook_results: list[Any]) -> dict | None:
"""Extract transformed tool_args from before_tool_call hook results.
Hooks that want to modify tool args return a dict with a "tool_args" key.
The last hook that returns transformed args wins.
Returns None if no hook returned transformed args.
"""
transformed = None
for result in hook_results:
if isinstance(result, dict) and "tool_args" in result:
transformed = result["tool_args"]
return transformedhumanize_hook function · python · L19-L47 (29 LOC)humanizer/hook.py
def humanize_hook(context: dict) -> dict[str, Any] | None:
"""before_tool_call hook that humanizes outbound text fields.
Returns modified tool_args dict if the tool is an outbound communication
tool and has text fields to transform. Returns None otherwise.
"""
tool_name = context.get("tool_name", "")
if tool_name not in OUTBOUND_TOOLS:
return None
tool_args = context.get("tool_args", {})
if not tool_args:
return None
modified = dict(tool_args)
changed = False
for field in TEXT_FIELDS:
value = modified.get(field)
if value and isinstance(value, str):
cleaned = humanize(value)
if cleaned != value:
modified[field] = cleaned
changed = True
if not changed:
return None
return {"tool_args": modified}_build_rules function · python · L21-L176 (156 LOC)humanizer/rules.py
def _build_rules() -> list[HumanizerRule]:
"""Build the default set of humanizer rules."""
rules = []
# --- Em dash removal ---
rules.append(HumanizerRule(
name="em_dash",
pattern=re.compile(r"\s*\u2014\s*"),
replacement=", ",
description="Replace em dashes with commas",
))
rules.append(HumanizerRule(
name="double_hyphen_em_dash",
pattern=re.compile(r"\s*--\s*"),
replacement=", ",
description="Replace double-hyphen em dashes with commas",
))
# --- AI vocabulary swaps ---
vocab_swaps = [
(r"\bAdditionally\b", "Also"),
(r"\badditionally\b", "also"),
(r"\butilize\b", "use"),
(r"\bUtilize\b", "Use"),
(r"\butilizing\b", "using"),
(r"\bUtilizing\b", "Using"),
(r"\bleverage\b", "use"),
(r"\bLeverage\b", "Use"),
(r"\bleveraging\b", "using"),
(r"\bLeveraging\b", "Using"),
(r"\bfacilitate\b", "help with"),
humanize function · python · L182-L209 (28 LOC)humanizer/rules.py
def humanize(text: str | None, rules: list[HumanizerRule] | None = None) -> str:
"""Apply humanizer rules to text, returning the cleaned version.
Args:
text: Input text to humanize. None returns empty string.
rules: Optional custom rule list. Defaults to DEFAULT_RULES.
Returns:
Cleaned text with AI patterns removed.
"""
if not text:
return ""
if rules is None:
rules = DEFAULT_RULES
result = text
for rule in rules:
result = rule.pattern.sub(rule.replacement, result)
# Clean up double spaces left by removals
result = re.sub(r" +", " ", result)
# Clean up space before punctuation
result = re.sub(r" ([.,;:!?])", r"\1", result)
# Clean up leading space on lines
result = re.sub(r"(?m)^ +", "", result)
return result.strip()app_lifespan function · python · L46-L190 (145 LOC)mcp_server.py
async def app_lifespan(server: FastMCP):
"""Initialize shared resources on startup, clean up on shutdown."""
app_config.DATA_DIR.mkdir(parents=True, exist_ok=True)
app_config.AGENT_CONFIGS_DIR.mkdir(parents=True, exist_ok=True)
routing_db_candidate = getattr(app_config, "CALENDAR_ROUTING_DB_PATH", None)
routing_db_path = Path(routing_db_candidate) if isinstance(routing_db_candidate, (str, Path)) else (app_config.DATA_DIR / "calendar-routing.db")
dual_read_candidate = getattr(app_config, "CALENDAR_REQUIRE_DUAL_READ", True)
require_dual_read = bool(dual_read_candidate) if isinstance(dual_read_candidate, bool) else True
claude_bin_candidate = getattr(app_config, "CLAUDE_BIN", "claude")
claude_bin = claude_bin_candidate if isinstance(claude_bin_candidate, str) and claude_bin_candidate else "claude"
claude_mcp_candidate = getattr(app_config, "CLAUDE_MCP_CONFIG", "")
claude_mcp_config = claude_mcp_candidate if isinstance(claude_mcp_candidate, str) All rows above produced by Repobility · https://repobility.com
register function · python · L9-L163 (155 LOC)mcp_tools/agent_tools.py
def register(mcp, state):
"""Register agent tools with the FastMCP server."""
@mcp.tool()
async def list_agents() -> str:
"""List all available expert agent configurations."""
agent_registry = state.agent_registry
agents = agent_registry.list_agents()
if not agents:
return json.dumps({"message": "No agents configured yet.", "results": []})
results = [
{"name": a.name, "description": a.description, "capabilities": a.capabilities}
for a in agents
]
return json.dumps({"results": results})
@mcp.tool()
async def get_agent(name: str) -> str:
"""Get full details for a specific expert agent by name.
Args:
name: The agent name to look up
"""
agent_registry = state.agent_registry
agent = agent_registry.get_agent(name)
if not agent:
return json.dumps({"error": f"Agent '{name}' not found."})
return json.duregister function · python · L18-L82 (65 LOC)mcp_tools/brain_tools.py
def register(mcp, state):
global _state_ref
_state_ref = state
@mcp.tool()
async def get_session_brain() -> str:
"""Get the current Session Brain -- persistent cross-session context.
Returns the full brain state: active workstreams, open action items,
recent decisions, key people context, and session handoff notes.
"""
brain = _get_brain()
if brain is None:
return json.dumps({"error": "Session brain not initialized"})
brain.load()
return json.dumps(brain.to_dict())
@mcp.tool()
async def update_session_brain(action: str, data: str) -> str:
"""Update the Session Brain with new information.
Args:
action: One of: add_workstream, update_workstream, add_action_item,
complete_action_item, add_decision, add_person, add_handoff_note
data: JSON string with action-specific fields:
- add_workstream: {"name", "status", "cregister function · python · L11-L71 (61 LOC)mcp_tools/channel_tools.py
def register(mcp, state):
"""Register channel tools with the FastMCP server."""
@mcp.tool()
async def list_inbound_events(
channel: str = "",
event_type: str = "",
limit: int = 25,
) -> str:
"""List recent inbound events normalized across channels (iMessage, Mail, Webhook).
Args:
channel: Filter by channel ("imessage", "mail", "webhook"). Leave empty for all.
event_type: Filter by event type ("message", "email", "webhook_event"). Leave empty for all.
limit: Maximum events per channel (default 25, max 100)
"""
limit = max(1, min(limit, 100))
channels_to_query = (
[channel] if channel else ["imessage", "mail", "webhook"]
)
all_events = []
for ch in channels_to_query:
raw_events = _fetch_raw_events(state, ch, limit)
for raw in raw_events:
if "error" in raw:
continue
_fetch_raw_events function · python · L74-L107 (34 LOC)mcp_tools/channel_tools.py
def _fetch_raw_events(state, channel: str, limit: int) -> list[dict]:
"""Fetch raw events from the appropriate store for a channel."""
try:
if channel == "imessage":
messages_store = state.messages_store
if messages_store is None:
return []
return messages_store.get_messages(minutes=24 * 60, limit=limit)
elif channel == "mail":
mail_store = state.mail_store
if mail_store is None:
return []
return mail_store.get_messages(limit=limit)
elif channel == "webhook":
memory_store = state.memory_store
if memory_store is None:
return []
events = memory_store.list_webhook_events(limit=limit)
return [
{
"id": e.id,
"source": e.source,
"event_type": e.event_type,
"payload": e.payload,
"stregister function · python · L14-L78 (65 LOC)mcp_tools/document_tools.py
def register(mcp, state):
"""Register document tools with the FastMCP server."""
@mcp.tool()
async def search_documents(query: str, top_k: int = 5) -> str:
"""Semantic search over ingested documents. Returns the most relevant chunks.
Args:
query: Natural language search query
top_k: Number of results to return (default 5)
"""
document_store = state.document_store
try:
results = _retry_on_transient(document_store.search, query, top_k=top_k)
if not results:
return json.dumps({"message": "No documents found. Ingest documents first.", "results": []})
try:
state.memory_store.record_skill_usage("search_documents", query)
except Exception:
pass
return json.dumps({"results": results})
except (sqlite3.OperationalError, ValueError, KeyError) as e:
return json.dumps({"error": f"Database errorregister function · python · L10-L121 (112 LOC)mcp_tools/enrichment.py
def register(mcp, state):
"""Register enrichment tools with the MCP server."""
@mcp.tool()
async def enrich_person(name: str, days_back: int = 7) -> str:
"""Get a consolidated profile for a person: identities, facts, delegations, decisions, recent messages, and emails.
Fetches from 6 data sources in parallel. Much faster than calling each tool separately.
Empty sections are omitted. If a data source is unavailable, that section is silently skipped.
Args:
name: Person's name to search for (canonical name or partial match)
days_back: How many days back to search communications (default 7)
"""
context = {"name": name}
minutes = days_back * 1440
async def fetch_identities():
try:
results = state.memory_store.search_identity(name)
if results:
return ("identities", results[:10])
except Exception as e:
register function · python · L14-L200 (187 LOC)mcp_tools/event_rule_tools.py
def register(mcp, state):
"""Register event rule tools with the FastMCP server."""
@mcp.tool()
async def create_event_rule(
name: str,
event_source: str,
event_type_pattern: str,
agent_name: str,
description: str = "",
agent_input_template: str = "",
delivery_channel: str = "",
delivery_config: str = "",
enabled: bool = True,
priority: int = 100,
) -> str:
"""Create an event rule that triggers an agent when a matching webhook event arrives.
Args:
name: Unique name for this rule (required)
event_source: Source to match (e.g. "github", "jira") (required)
event_type_pattern: Glob pattern for event types (e.g. "alert.*", "incident.critical") (required)
agent_name: Name of the expert agent to activate (required)
description: Human-readable description of what this rule does
agent_input_template: Template forregister function · python · L18-L150 (133 LOC)mcp_tools/formatter_tools.py
def register(mcp, state):
"""Register formatter tools with the FastMCP server."""
@mcp.tool()
async def format_table(title: str, columns: str, rows: str, mode: str = "terminal") -> str:
"""Render a formatted table from structured data.
Args:
title: Table title
columns: JSON array of column header names (e.g. '["Time", "Event", "Status"]')
rows: JSON array of row arrays (e.g. '[["8:30 AM", "ePMLT", "Zoom"]]')
mode: Render mode — "terminal" for ANSI color, "plain" for no-color text (default: terminal)
"""
try:
from formatter.tables import render
parsed_columns = json.loads(columns)
parsed_rows = json.loads(rows)
result = render(
title=title,
columns=parsed_columns,
rows=parsed_rows,
mode=mode,
)
if not result:
return json.dumps({"result": ""})
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
register function · python · L6-L90 (85 LOC)mcp_tools/identity_tools.py
def register(mcp, state):
"""Register identity linking tools with the MCP server."""
@mcp.tool()
async def link_identity(
canonical_name: str,
provider: str,
provider_id: str,
display_name: str = "",
email: str = "",
) -> str:
"""Link a provider identity to a canonical person name.
Maps a person's account on a specific provider (iMessage, email, Teams, etc.)
to their canonical name, enabling cross-channel identity resolution.
Args:
canonical_name: The person's canonical name (e.g. "Jane Smith")
provider: Provider name: imessage, email, m365_teams, m365_email, slack, jira, confluence
provider_id: Unique ID on the provider (phone number, email, user ID, etc.)
display_name: Optional display name on the provider
email: Optional email address associated with this identity
"""
try:
result = state.memory_store.link_iregister function · python · L6-L176 (171 LOC)mcp_tools/imessage_tools.py
def register(mcp, state):
"""Register iMessage tools with the MCP server."""
@mcp.tool()
async def get_imessages(
minutes: int = 60,
limit: int = 25,
include_from_me: bool = True,
conversation: str = "",
) -> str:
"""Get recent iMessages from Messages.app chat history.
Args:
minutes: Lookback window in minutes (default: 60)
limit: Maximum number of messages to return (default: 25, max: 200)
include_from_me: Include your own sent messages (default: True)
conversation: Optional sender/chat identifier filter
"""
messages_store = state.messages_store
try:
messages = messages_store.get_messages(
minutes=minutes,
limit=limit,
include_from_me=include_from_me,
conversation=conversation,
)
return json.dumps({"results": messages})
except Exception as e:
_format_delegations function · python · L11-L24 (14 LOC)mcp_tools/lifecycle_tools.py
def _format_delegations(results):
"""Add formatted table to delegation results."""
try:
from formatter.data_helpers import delegations_to_table_data
from formatter.tables import render
items = results.get("results", [])
if not items:
results["formatted"] = ""
return results
columns, rows = delegations_to_table_data(items)
results["formatted"] = render(columns=columns, rows=rows, title="Delegations", mode="plain")
except Exception:
results["formatted"] = ""
return results