← back to jrichyrich__chief-of-staff

Function bodies 522 total

All specs Real LLM only Function bodies
ingest_path function · python · L94-L145 (52 LOC)
documents/ingestion.py
def ingest_path(path: Path, document_store: "DocumentStore") -> str:
    """Ingest a file or directory into the document store.

    Returns a summary string of what was ingested.
    """
    files = []

    if path.is_file():
        files = [path]
    elif path.is_dir():
        for ext in SUPPORTED_EXTENSIONS:
            files.extend(path.glob(f"**/*{ext}"))
        # Security: reject symlinks and paths that escape the target directory
        resolved_root = path.resolve()
        safe_files = []
        for f in files:
            if f.is_symlink():
                continue
            if not f.resolve().is_relative_to(resolved_root):
                continue
            safe_files.append(f)
        files = safe_files

    if not files:
        return f"No supported files found at {path}"

    total_chunks = 0
    skipped = 0
    for file in files:
        try:
            text = load_text_file(file)
        except ValueError:
            skipped += 1
            continue
       
DocumentStore.__init__ method · python · L9-L18 (10 LOC)
documents/store.py
    def __init__(self, persist_dir: Path):
        self.client = chromadb.Client(Settings(
            persist_directory=str(persist_dir),
            is_persistent=True,
            anonymized_telemetry=False,
        ))
        self.collection = self.client.get_or_create_collection(
            name="chief_of_staff_docs",
            metadata={"hnsw:space": "cosine"},
        )
DocumentStore.add_documents method · python · L20-L26 (7 LOC)
documents/store.py
    def add_documents(
        self,
        texts: list[str],
        metadatas: list[dict],
        ids: list[str],
    ) -> None:
        self.collection.add(documents=texts, metadatas=metadatas, ids=ids)
DocumentStore.search method · python · L28-L39 (12 LOC)
documents/store.py
    def search(self, query: str, top_k: int = 5) -> list[dict]:
        if self.collection.count() == 0:
            return []
        results = self.collection.query(query_texts=[query], n_results=top_k)
        output = []
        for i in range(len(results["documents"][0])):
            output.append({
                "text": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i] if results.get("distances") else None,
            })
        return output
_format_date function · python · L15-L35 (21 LOC)
formatter/brief.py
def _format_date(date_str: str) -> str:
    """Convert ISO date string to human-readable format.

    If *date_str* is already human-readable (not ISO), it is returned as-is.

    Args:
        date_str: Date in YYYY-MM-DD format, or already human-readable.

    Returns:
        Human-readable date like 'Wednesday, February 25, 2026'.
    """
    try:
        dt = datetime.strptime(date_str, "%Y-%m-%d")
    except ValueError:
        # Already human-readable or non-ISO — pass through unchanged
        return date_str
    try:
        return dt.strftime("%A, %B %-d, %Y")
    except ValueError:
        # Windows does not support %-d; fall back to %d and strip leading zero
        return dt.strftime("%A, %B %d, %Y").replace(" 0", " ")
_build_calendar_table function · python · L38-L57 (20 LOC)
formatter/brief.py
def _build_calendar_table(entries: Sequence[CalendarEntry]) -> Table:
    """Build a rich Table for calendar entries.

    Args:
        entries: Sequence of CalendarEntry dicts.

    Returns:
        A rich Table with Time, Event, and Status columns.
    """
    table = Table(box=INNER_BOX, expand=True, show_edge=False)
    table.add_column("Time", style="cyan", no_wrap=True)
    table.add_column("Event", style="white")
    table.add_column("Status", style="dim")
    for entry in entries:
        table.add_row(
            entry.get("time", ""),
            entry.get("event", ""),
            entry.get("status", ""),
        )
    return table
_build_action_items function · python · L60-L77 (18 LOC)
formatter/brief.py
def _build_action_items(items: Sequence[ActionItem]) -> Text:
    """Build formatted text for action items with priority icons.

    Args:
        items: Sequence of ActionItem dicts.

    Returns:
        A rich Text object with one line per action item.
    """
    text = Text()
    for i, item in enumerate(items):
        priority = item.get("priority", "medium")
        icon = PRIORITY_ICONS.get(priority, PRIORITY_ICONS["medium"])
        line = f"  {icon} {item.get('text', '')}"
        if i < len(items) - 1:
            line += "\n"
        text.append(line)
    return text
All rows above produced by Repobility · https://repobility.com
_build_conflicts function · python · L80-L104 (25 LOC)
formatter/brief.py
def _build_conflicts(conflicts: Sequence[Conflict]) -> Text:
    """Build formatted text for scheduling conflicts.

    Args:
        conflicts: Sequence of Conflict dicts.

    Returns:
        A rich Text object with one line per conflict.
    """
    text = Text()
    for i, conflict in enumerate(conflicts):
        time = conflict.get("time", "")
        a = conflict.get("a", "")
        b = conflict.get("b", "")
        if time and b:
            line = f"  {time}: {a} \u2190\u2192 {b}"
        elif time:
            line = f"  {time}: {a}"
        else:
            # Plain string was coerced — just display it
            line = f"  {a}"
        if i < len(conflicts) - 1:
            line += "\n"
        text.append(line)
    return text
_build_email_table function · python · L107-L126 (20 LOC)
formatter/brief.py
def _build_email_table(highlights: Sequence[EmailHighlight]) -> Table:
    """Build a rich Table for email highlights.

    Args:
        highlights: Sequence of EmailHighlight dicts.

    Returns:
        A rich Table with Sender, Subject, and Tag columns.
    """
    table = Table(box=INNER_BOX, expand=True, show_edge=False)
    table.add_column("Sender", style="cyan", no_wrap=True)
    table.add_column("Subject", style="white")
    table.add_column("Tag", style="dim")
    for highlight in highlights:
        table.add_row(
            highlight.get("sender", ""),
            highlight.get("subject", ""),
            highlight.get("tag", ""),
        )
    return table
_build_delegation_table function · python · L129-L143 (15 LOC)
formatter/brief.py
def _build_delegation_table(items: Sequence[dict]) -> Table:
    """Build a rich Table for structured delegation data."""
    table = Table(box=INNER_BOX, expand=True, show_edge=False)
    table.add_column("Task", style="white")
    table.add_column("Assigned To", style="cyan", no_wrap=True)
    table.add_column("Priority", style="dim")
    table.add_column("Status", style="dim")
    for item in items:
        table.add_row(
            item.get("task", ""),
            item.get("delegated_to", ""),
            item.get("priority", ""),
            item.get("status", ""),
        )
    return table
_build_decision_table function · python · L146-L158 (13 LOC)
formatter/brief.py
def _build_decision_table(items: Sequence[dict]) -> Table:
    """Build a rich Table for structured decision data."""
    table = Table(box=INNER_BOX, expand=True, show_edge=False)
    table.add_column("Decision", style="white")
    table.add_column("Status", style="dim")
    table.add_column("Owner", style="cyan", no_wrap=True)
    for item in items:
        table.add_row(
            item.get("title", ""),
            item.get("status", ""),
            item.get("owner", ""),
        )
    return table
_build_okr_table function · python · L161-L175 (15 LOC)
formatter/brief.py
def _build_okr_table(items: Sequence[dict]) -> Table:
    """Build a rich Table for OKR highlights."""
    table = Table(box=INNER_BOX, expand=True, show_edge=False)
    table.add_column("Initiative", style="white")
    table.add_column("Team", style="cyan", no_wrap=True)
    table.add_column("Status", style="dim")
    table.add_column("Progress", style="dim")
    for item in items:
        table.add_row(
            item.get("initiative", item.get("name", "")),
            item.get("team", ""),
            item.get("status", ""),
            item.get("progress", ""),
        )
    return table
_build_personal function · python · L178-L193 (16 LOC)
formatter/brief.py
def _build_personal(items: Sequence[str]) -> Text:
    """Build a bulleted list for personal items.

    Args:
        items: Sequence of string items.

    Returns:
        A rich Text object with bullet-prefixed lines.
    """
    text = Text()
    for i, item in enumerate(items):
        line = f"  \u00b7 {item}"
        if i < len(items) - 1:
            line += "\n"
        text.append(line)
    return text
_coerce_item function · python · L196-L203 (8 LOC)
formatter/brief.py
def _coerce_item(item, wrap_key: str, defaults: dict | None = None) -> dict:
    """Coerce a plain string into a dict, or return the dict unchanged."""
    if isinstance(item, str):
        d = {wrap_key: item}
        if defaults:
            d.update(defaults)
        return d
    return item
render_daily function · python · L206-L348 (143 LOC)
formatter/brief.py
def render_daily(
    date: str,
    calendar: Optional[Sequence[CalendarEntry]] = None,
    action_items: Optional[Sequence[ActionItem]] = None,
    conflicts: Optional[Sequence[Conflict]] = None,
    email_highlights: Optional[Sequence[EmailHighlight]] = None,
    personal: Optional[Sequence[str]] = None,
    delegations: Optional[str] = None,
    decisions: Optional[str] = None,
    delegation_items: Optional[Sequence[dict]] = None,
    decision_items: Optional[Sequence[dict]] = None,
    okr_highlights: Optional[Sequence[dict]] = None,
    mode: str = "terminal",
    width: int = 80,
) -> str:
    """Render a complete daily briefing dashboard.

    Sections with no data are omitted entirely. Returns empty string
    if all sections are empty.

    Args:
        date: ISO format date string (YYYY-MM-DD).
        calendar: Calendar entries for the day.
        action_items: Prioritized action items.
        conflicts: Scheduling conflicts.
        email_highlights: Notable emails.
  
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
render_kv function · python · L15-L39 (25 LOC)
formatter/cards.py
def render_kv(fields: dict[str, str], mode: str = "terminal", width: int = 80) -> str:
    """Render key-value pairs as a borderless two-column table.

    Args:
        fields: Dictionary of key-value pairs to display.
        mode: "terminal" for ANSI colour output, "plain" for no markup.
        width: Console width in characters.

    Returns:
        Rendered string, or empty string if fields is empty.
    """
    if not fields:
        return ""

    console = get_console(mode=mode, width=width)

    table = Table(show_header=False, show_edge=False, box=None, padding=(0, 1))
    table.add_column("Key", style="bold")
    table.add_column("Value")

    for key, value in fields.items():
        table.add_row(key, value)

    console.print(table)
    return render_to_string(console)
render function · python · L42-L102 (61 LOC)
formatter/cards.py
def render(
    title: str,
    fields: dict[str, str],
    mode: str = "terminal",
    status: Optional[str] = None,
    body: Optional[str] = None,
    width: int = 80,
) -> str:
    """Render a status card with optional status badge and body text.

    Args:
        title: Card title displayed at the top of the panel.
        fields: Key-value pairs to display inside the card.
        mode: "terminal" for ANSI colour output, "plain" for no markup.
        status: Optional status string (e.g. "ok", "fail") for a coloured badge.
        body: Optional body text displayed below the key-value fields.
        width: Console width in characters.

    Returns:
        Rendered card string.
    """
    console = get_console(mode=mode, width=width)

    # Build the inner content: key-value table
    inner_parts: list = []

    if fields:
        table = Table(show_header=False, show_edge=False, box=None, padding=(0, 1))
        table.add_column("Key", style="bold")
        table.add_column("
get_console function · python · L7-L28 (22 LOC)
formatter/console.py
def get_console(mode: str = "terminal", width: int = 80) -> Console:
    """Create a Console configured for the given render mode.

    Args:
        mode: "terminal" for ANSI color output, "plain" for no-ANSI text.
        width: Console width in characters.

    Returns:
        A rich Console instance.
    """
    if mode == "plain":
        return Console(
            file=StringIO(),
            force_terminal=False,
            no_color=True,
            width=width,
        )
    return Console(
        file=StringIO(),
        force_terminal=True,
        width=width,
    )
render function · python · L14-L57 (44 LOC)
formatter/dashboard.py
def render(
    title: str,
    panels: Sequence[dict],
    mode: str = "terminal",
    columns: int = 1,
    width: int = 80,
) -> str:
    """Render a multi-panel dashboard as a string.

    Args:
        title: Dashboard header title.
        panels: Sequence of dicts, each with ``"title"`` and ``"content"`` keys.
        mode: ``"terminal"`` for ANSI colour output, ``"plain"`` for no ANSI.
        columns: Number of columns in the grid layout (reserved for future use).
        width: Fixed output width.

    Returns:
        Rendered dashboard string, or empty string if *panels* is empty.
    """
    if not panels:
        return ""

    console = get_console(mode=mode, width=width)

    # Header panel with title centered, using BOX style
    header = Panel(
        Align.center(title),
        box=BOX,
        style=TITLE_STYLE,
        width=width,
    )
    console.print(header)

    # Render each section panel
    for panel_spec in panels:
        section = Panel(
            p
_format_time function · python · L14-L24 (11 LOC)
formatter/data_helpers.py
def _format_time(iso_str: str) -> str:
    """Convert ISO datetime string to human-readable time like '8:30 AM'."""
    try:
        dt = datetime.fromisoformat(iso_str)
        try:
            return dt.strftime("%-I:%M %p")
        except ValueError:
            # Windows does not support %-I; fall back to %I and strip leading zero
            return dt.strftime("%I:%M %p").lstrip("0")
    except (ValueError, TypeError):
        return iso_str
calendar_events_to_entries function · python · L27-L44 (18 LOC)
formatter/data_helpers.py
def calendar_events_to_entries(events: Sequence[dict]) -> list[CalendarEntry]:
    """Convert raw calendar event dicts to CalendarEntry format.

    Args:
        events: Raw event dicts from calendar tools (keys: title, start, end, status, location).

    Returns:
        List of CalendarEntry dicts with time, event, status keys.
    """
    entries: list[CalendarEntry] = []
    for ev in events:
        entry: CalendarEntry = {
            "time": _format_time(ev.get("start", "")),
            "event": ev.get("title", ev.get("summary", "")),
            "status": ev.get("status", ""),
        }
        entries.append(entry)
    return entries
delegations_to_table_data function · python · L47-L65 (19 LOC)
formatter/data_helpers.py
def delegations_to_table_data(
    delegations: Sequence[dict],
) -> tuple[list[str], list[tuple[str, ...]]]:
    """Convert raw delegation dicts to table columns and rows.

    Returns:
        (columns, rows) tuple for formatter.tables.render().
    """
    columns = ["Task", "Assigned To", "Priority", "Status", "Due"]
    rows = []
    for d in delegations:
        rows.append((
            d.get("task", ""),
            d.get("delegated_to", ""),
            d.get("priority", ""),
            d.get("status", ""),
            d.get("due_date", ""),
        ))
    return columns, rows
decisions_to_table_data function · python · L68-L85 (18 LOC)
formatter/data_helpers.py
def decisions_to_table_data(
    decisions: Sequence[dict],
) -> tuple[list[str], list[tuple[str, ...]]]:
    """Convert raw decision dicts to table columns and rows.

    Returns:
        (columns, rows) tuple for formatter.tables.render().
    """
    columns = ["Decision", "Status", "Owner", "Follow-up"]
    rows = []
    for d in decisions:
        rows.append((
            d.get("title", ""),
            d.get("status", ""),
            d.get("owner", ""),
            d.get("follow_up_date", ""),
        ))
    return columns, rows
Want this analysis on your repo? https://repobility.com/scan/
delegations_to_summary function · python · L88-L103 (16 LOC)
formatter/data_helpers.py
def delegations_to_summary(delegations: Sequence[dict]) -> str:
    """Produce a one-line summary of delegations for the daily brief.

    Returns:
        Summary string like '2 active, 1 high priority' or empty string.
    """
    if not delegations:
        return ""

    active = [d for d in delegations if d.get("status") == "active"]
    high = [d for d in active if d.get("priority") in ("high", "critical")]

    parts = [f"{len(active)} active"]
    if high:
        parts.append(f"{len(high)} high priority")
    return ", ".join(parts)
decisions_to_summary function · python · L106-L117 (12 LOC)
formatter/data_helpers.py
def decisions_to_summary(decisions: Sequence[dict]) -> str:
    """Produce a one-line summary of decisions for the daily brief.

    Returns:
        Summary string like '2 pending' or empty string.
    """
    if not decisions:
        return ""

    pending = [d for d in decisions if d.get("status") == "pending_execution"]
    parts = [f"{len(pending)} pending"]
    return ", ".join(parts)
render function · python · L11-L48 (38 LOC)
formatter/tables.py
def render(
    columns: Sequence[str],
    rows: Sequence[Sequence[str]],
    mode: str = "terminal",
    title: Optional[str] = None,
    width: int = 80,
) -> str:
    """Render a table as a string.

    Args:
        columns: Column header labels.
        rows: Row data.  Rows shorter than *columns* are padded with empty
            strings so that Rich never raises on a column-count mismatch.
        mode: "terminal" for ANSI colour output, "plain" for no escape codes.
        title: Optional table title shown above the header row.
        width: Fixed output width in columns.

    Returns:
        The rendered table string, or an empty string when *rows* is empty.
    """
    if not rows:
        return ""

    num_cols = len(columns)

    table = Table(title=title, box=BOX, header_style=HEADER_STYLE, width=width)

    for col in columns:
        table.add_column(col)

    for row in rows:
        # Pad short rows with empty strings so Rich does not crash.
        padded = list(r
audit_log_hook function · python · L20-L49 (30 LOC)
hooks/builtin.py
def audit_log_hook(context: dict) -> dict[str, Any]:
    """Append a JSON line to the audit log for every tool call.

    Works as both a before_tool_call and after_tool_call hook.
    """
    log_path = Path(AUDIT_LOG_PATH)
    log_path.parent.mkdir(parents=True, exist_ok=True)

    entry = {
        "timestamp": context.get("timestamp", ""),
        "tool_name": context.get("tool_name", ""),
        "agent_name": context.get("agent_name", ""),
        "event": "after_tool_call" if "result" in context else "before_tool_call",
    }

    # Include args only for before_tool_call (to avoid double-logging)
    if "result" not in context:
        entry["tool_args"] = context.get("tool_args", {})
    else:
        # Truncate large results to keep the log manageable
        result = context.get("result")
        result_str = str(result)
        if len(result_str) > 500:
            result_str = result_str[:500] + "...[truncated]"
        entry["result_preview"] = result_str

    with open(lo
timing_after_hook function · python · L58-L71 (14 LOC)
hooks/builtin.py
def timing_after_hook(context: dict) -> dict[str, Any] | None:
    """Calculate and return elapsed time for a tool call (after_tool_call).

    Expects the same timestamp key that was passed to timing_before_hook.
    """
    key = context.get("timestamp", "")
    start = _timing_store.pop(key, None)
    if start is None:
        return None
    elapsed = time.monotonic() - start
    return {
        "tool_name": context.get("tool_name", ""),
        "elapsed_seconds": round(elapsed, 4),
    }
HookRegistry.register_hook method · python · L40-L70 (31 LOC)
hooks/registry.py
    def register_hook(
        self,
        event_type: str,
        callback: Callable[[dict], Any],
        *,
        name: str = "",
        priority: int = 100,
        enabled: bool = True,
    ) -> None:
        """Register a hook callback for an event type.

        Args:
            event_type: One of EVENT_TYPES.
            callback: Callable receiving a context dict. May be sync or async.
            name: Optional human-readable name (used in logs).
            priority: Lower numbers run first. Default 100.
            enabled: If False, the hook is registered but will not fire.
        """
        if event_type not in EVENT_TYPES:
            raise ValueError(
                f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
            )
        entry = {
            "name": name or getattr(callback, "__name__", "anonymous"),
            "callback": callback,
            "priority": priority,
            "enabled": enabled,
        }
        s
HookRegistry.fire_hooks method · python · L72-L92 (21 LOC)
hooks/registry.py
    def fire_hooks(self, event_type: str, context: dict) -> list[Any]:
        """Fire all enabled hooks for *event_type*, returning their results.

        Each hook receives a **copy** of *context* so mutations don't leak between hooks.
        If a hook raises, the exception is logged and ``None`` is appended to results.
        """
        if event_type not in EVENT_TYPES:
            raise ValueError(
                f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
            )
        results: list[Any] = []
        for entry in self._hooks[event_type]:
            if not entry["enabled"]:
                continue
            try:
                result = entry["callback"](dict(context))
                results.append(result)
            except Exception:
                logger.exception("Hook '%s' failed for event '%s'", entry["name"], event_type)
                results.append(None)
        return results
HookRegistry.get_hooks method · python · L94-L100 (7 LOC)
hooks/registry.py
    def get_hooks(self, event_type: str) -> list[dict[str, Any]]:
        """Return the list of hook entries for *event_type* (mainly for introspection/tests)."""
        if event_type not in EVENT_TYPES:
            raise ValueError(
                f"Unknown event type '{event_type}'. Must be one of: {sorted(EVENT_TYPES)}"
            )
        return list(self._hooks[event_type])
If a scraper extracted this row, it came from Repobility (https://repobility.com)
HookRegistry.load_configs method · python · L111-L155 (45 LOC)
hooks/registry.py
    def load_configs(self, config_dir: str | Path) -> int:
        """Load hook YAML configs from *config_dir*. Returns count of hooks loaded.

        Each YAML file should contain a list of hook definitions:

            - event_type: before_tool_call
              name: my_hook
              handler: hooks.builtin.audit_log_hook
              priority: 50
              enabled: true
        """
        config_dir = Path(config_dir)
        if not config_dir.is_dir():
            logger.warning("Hook config directory does not exist: %s", config_dir)
            return 0

        loaded = 0
        for yaml_file in sorted(config_dir.glob("*.yaml")):
            try:
                with open(yaml_file) as f:
                    entries = yaml.safe_load(f)
                if not isinstance(entries, list):
                    logger.warning("Hook config %s: expected a list, got %s", yaml_file, type(entries).__name__)
                    continue
                for entry in entries:
   
_import_handler function · python · L162-L173 (12 LOC)
hooks/registry.py
def _import_handler(dotted_path: str) -> Callable | None:
    """Import a callable from a dotted module path like 'hooks.builtin.audit_log_hook'."""
    parts = dotted_path.rsplit(".", 1)
    if len(parts) != 2:
        return None
    module_path, attr_name = parts
    try:
        import importlib
        mod = importlib.import_module(module_path)
        return getattr(mod, attr_name, None)
    except (ImportError, AttributeError):
        return None
build_tool_context function · python · L176-L191 (16 LOC)
hooks/registry.py
def build_tool_context(
    tool_name: str,
    tool_args: dict,
    agent_name: str = "",
    result: Any = None,
) -> dict:
    """Build a standard context dict for tool-related hook events."""
    ctx: dict[str, Any] = {
        "tool_name": tool_name,
        "tool_args": tool_args,
        "agent_name": agent_name,
        "timestamp": datetime.now(timezone.utc).isoformat(),
    }
    if result is not None:
        ctx["result"] = result
    return ctx
extract_transformed_args function · python · L194-L206 (13 LOC)
hooks/registry.py
def extract_transformed_args(hook_results: list[Any]) -> dict | None:
    """Extract transformed tool_args from before_tool_call hook results.

    Hooks that want to modify tool args return a dict with a "tool_args" key.
    The last hook that returns transformed args wins.

    Returns None if no hook returned transformed args.
    """
    transformed = None
    for result in hook_results:
        if isinstance(result, dict) and "tool_args" in result:
            transformed = result["tool_args"]
    return transformed
humanize_hook function · python · L19-L47 (29 LOC)
humanizer/hook.py
def humanize_hook(context: dict) -> dict[str, Any] | None:
    """before_tool_call hook that humanizes outbound text fields.

    Returns modified tool_args dict if the tool is an outbound communication
    tool and has text fields to transform. Returns None otherwise.
    """
    tool_name = context.get("tool_name", "")
    if tool_name not in OUTBOUND_TOOLS:
        return None

    tool_args = context.get("tool_args", {})
    if not tool_args:
        return None

    modified = dict(tool_args)
    changed = False

    for field in TEXT_FIELDS:
        value = modified.get(field)
        if value and isinstance(value, str):
            cleaned = humanize(value)
            if cleaned != value:
                modified[field] = cleaned
                changed = True

    if not changed:
        return None

    return {"tool_args": modified}
_build_rules function · python · L21-L176 (156 LOC)
humanizer/rules.py
def _build_rules() -> list[HumanizerRule]:
    """Build the default set of humanizer rules."""
    rules = []

    # --- Em dash removal ---
    rules.append(HumanizerRule(
        name="em_dash",
        pattern=re.compile(r"\s*\u2014\s*"),
        replacement=", ",
        description="Replace em dashes with commas",
    ))
    rules.append(HumanizerRule(
        name="double_hyphen_em_dash",
        pattern=re.compile(r"\s*--\s*"),
        replacement=", ",
        description="Replace double-hyphen em dashes with commas",
    ))

    # --- AI vocabulary swaps ---
    vocab_swaps = [
        (r"\bAdditionally\b", "Also"),
        (r"\badditionally\b", "also"),
        (r"\butilize\b", "use"),
        (r"\bUtilize\b", "Use"),
        (r"\butilizing\b", "using"),
        (r"\bUtilizing\b", "Using"),
        (r"\bleverage\b", "use"),
        (r"\bLeverage\b", "Use"),
        (r"\bleveraging\b", "using"),
        (r"\bLeveraging\b", "Using"),
        (r"\bfacilitate\b", "help with"),
  
humanize function · python · L182-L209 (28 LOC)
humanizer/rules.py
def humanize(text: str | None, rules: list[HumanizerRule] | None = None) -> str:
    """Apply humanizer rules to text, returning the cleaned version.

    Args:
        text: Input text to humanize. None returns empty string.
        rules: Optional custom rule list. Defaults to DEFAULT_RULES.

    Returns:
        Cleaned text with AI patterns removed.
    """
    if not text:
        return ""

    if rules is None:
        rules = DEFAULT_RULES

    result = text
    for rule in rules:
        result = rule.pattern.sub(rule.replacement, result)

    # Clean up double spaces left by removals
    result = re.sub(r"  +", " ", result)
    # Clean up space before punctuation
    result = re.sub(r" ([.,;:!?])", r"\1", result)
    # Clean up leading space on lines
    result = re.sub(r"(?m)^ +", "", result)

    return result.strip()
app_lifespan function · python · L46-L190 (145 LOC)
mcp_server.py
async def app_lifespan(server: FastMCP):
    """Initialize shared resources on startup, clean up on shutdown."""
    app_config.DATA_DIR.mkdir(parents=True, exist_ok=True)
    app_config.AGENT_CONFIGS_DIR.mkdir(parents=True, exist_ok=True)
    routing_db_candidate = getattr(app_config, "CALENDAR_ROUTING_DB_PATH", None)
    routing_db_path = Path(routing_db_candidate) if isinstance(routing_db_candidate, (str, Path)) else (app_config.DATA_DIR / "calendar-routing.db")
    dual_read_candidate = getattr(app_config, "CALENDAR_REQUIRE_DUAL_READ", True)
    require_dual_read = bool(dual_read_candidate) if isinstance(dual_read_candidate, bool) else True
    claude_bin_candidate = getattr(app_config, "CLAUDE_BIN", "claude")
    claude_bin = claude_bin_candidate if isinstance(claude_bin_candidate, str) and claude_bin_candidate else "claude"
    claude_mcp_candidate = getattr(app_config, "CLAUDE_MCP_CONFIG", "")
    claude_mcp_config = claude_mcp_candidate if isinstance(claude_mcp_candidate, str) 
All rows above produced by Repobility · https://repobility.com
register function · python · L9-L163 (155 LOC)
mcp_tools/agent_tools.py
def register(mcp, state):
    """Register agent tools with the FastMCP server."""

    @mcp.tool()
    async def list_agents() -> str:
        """List all available expert agent configurations."""
        agent_registry = state.agent_registry
        agents = agent_registry.list_agents()
        if not agents:
            return json.dumps({"message": "No agents configured yet.", "results": []})
        results = [
            {"name": a.name, "description": a.description, "capabilities": a.capabilities}
            for a in agents
        ]
        return json.dumps({"results": results})

    @mcp.tool()
    async def get_agent(name: str) -> str:
        """Get full details for a specific expert agent by name.

        Args:
            name: The agent name to look up
        """
        agent_registry = state.agent_registry
        agent = agent_registry.get_agent(name)
        if not agent:
            return json.dumps({"error": f"Agent '{name}' not found."})
        return json.du
register function · python · L18-L82 (65 LOC)
mcp_tools/brain_tools.py
def register(mcp, state):
    global _state_ref
    _state_ref = state

    @mcp.tool()
    async def get_session_brain() -> str:
        """Get the current Session Brain -- persistent cross-session context.

        Returns the full brain state: active workstreams, open action items,
        recent decisions, key people context, and session handoff notes.
        """
        brain = _get_brain()
        if brain is None:
            return json.dumps({"error": "Session brain not initialized"})
        brain.load()
        return json.dumps(brain.to_dict())

    @mcp.tool()
    async def update_session_brain(action: str, data: str) -> str:
        """Update the Session Brain with new information.

        Args:
            action: One of: add_workstream, update_workstream, add_action_item,
                    complete_action_item, add_decision, add_person, add_handoff_note
            data: JSON string with action-specific fields:
                - add_workstream: {"name", "status", "c
register function · python · L11-L71 (61 LOC)
mcp_tools/channel_tools.py
def register(mcp, state):
    """Register channel tools with the FastMCP server."""

    @mcp.tool()
    async def list_inbound_events(
        channel: str = "",
        event_type: str = "",
        limit: int = 25,
    ) -> str:
        """List recent inbound events normalized across channels (iMessage, Mail, Webhook).

        Args:
            channel: Filter by channel ("imessage", "mail", "webhook"). Leave empty for all.
            event_type: Filter by event type ("message", "email", "webhook_event"). Leave empty for all.
            limit: Maximum events per channel (default 25, max 100)
        """
        limit = max(1, min(limit, 100))
        channels_to_query = (
            [channel] if channel else ["imessage", "mail", "webhook"]
        )
        all_events = []

        for ch in channels_to_query:
            raw_events = _fetch_raw_events(state, ch, limit)
            for raw in raw_events:
                if "error" in raw:
                    continue
           
_fetch_raw_events function · python · L74-L107 (34 LOC)
mcp_tools/channel_tools.py
def _fetch_raw_events(state, channel: str, limit: int) -> list[dict]:
    """Fetch raw events from the appropriate store for a channel."""
    try:
        if channel == "imessage":
            messages_store = state.messages_store
            if messages_store is None:
                return []
            return messages_store.get_messages(minutes=24 * 60, limit=limit)
        elif channel == "mail":
            mail_store = state.mail_store
            if mail_store is None:
                return []
            return mail_store.get_messages(limit=limit)
        elif channel == "webhook":
            memory_store = state.memory_store
            if memory_store is None:
                return []
            events = memory_store.list_webhook_events(limit=limit)
            return [
                {
                    "id": e.id,
                    "source": e.source,
                    "event_type": e.event_type,
                    "payload": e.payload,
                    "st
register function · python · L14-L78 (65 LOC)
mcp_tools/document_tools.py
def register(mcp, state):
    """Register document tools with the FastMCP server."""

    @mcp.tool()
    async def search_documents(query: str, top_k: int = 5) -> str:
        """Semantic search over ingested documents. Returns the most relevant chunks.

        Args:
            query: Natural language search query
            top_k: Number of results to return (default 5)
        """
        document_store = state.document_store
        try:
            results = _retry_on_transient(document_store.search, query, top_k=top_k)

            if not results:
                return json.dumps({"message": "No documents found. Ingest documents first.", "results": []})

            try:
                state.memory_store.record_skill_usage("search_documents", query)
            except Exception:
                pass
            return json.dumps({"results": results})
        except (sqlite3.OperationalError, ValueError, KeyError) as e:
            return json.dumps({"error": f"Database error
register function · python · L10-L121 (112 LOC)
mcp_tools/enrichment.py
def register(mcp, state):
    """Register enrichment tools with the MCP server."""

    @mcp.tool()
    async def enrich_person(name: str, days_back: int = 7) -> str:
        """Get a consolidated profile for a person: identities, facts, delegations, decisions, recent messages, and emails.

        Fetches from 6 data sources in parallel. Much faster than calling each tool separately.
        Empty sections are omitted. If a data source is unavailable, that section is silently skipped.

        Args:
            name: Person's name to search for (canonical name or partial match)
            days_back: How many days back to search communications (default 7)
        """
        context = {"name": name}
        minutes = days_back * 1440

        async def fetch_identities():
            try:
                results = state.memory_store.search_identity(name)
                if results:
                    return ("identities", results[:10])
            except Exception as e:
             
register function · python · L14-L200 (187 LOC)
mcp_tools/event_rule_tools.py
def register(mcp, state):
    """Register event rule tools with the FastMCP server."""

    @mcp.tool()
    async def create_event_rule(
        name: str,
        event_source: str,
        event_type_pattern: str,
        agent_name: str,
        description: str = "",
        agent_input_template: str = "",
        delivery_channel: str = "",
        delivery_config: str = "",
        enabled: bool = True,
        priority: int = 100,
    ) -> str:
        """Create an event rule that triggers an agent when a matching webhook event arrives.

        Args:
            name: Unique name for this rule (required)
            event_source: Source to match (e.g. "github", "jira") (required)
            event_type_pattern: Glob pattern for event types (e.g. "alert.*", "incident.critical") (required)
            agent_name: Name of the expert agent to activate (required)
            description: Human-readable description of what this rule does
            agent_input_template: Template for
register function · python · L18-L150 (133 LOC)
mcp_tools/formatter_tools.py
def register(mcp, state):
    """Register formatter tools with the FastMCP server."""

    @mcp.tool()
    async def format_table(title: str, columns: str, rows: str, mode: str = "terminal") -> str:
        """Render a formatted table from structured data.

        Args:
            title: Table title
            columns: JSON array of column header names (e.g. '["Time", "Event", "Status"]')
            rows: JSON array of row arrays (e.g. '[["8:30 AM", "ePMLT", "Zoom"]]')
            mode: Render mode — "terminal" for ANSI color, "plain" for no-color text (default: terminal)
        """
        try:
            from formatter.tables import render
            parsed_columns = json.loads(columns)
            parsed_rows = json.loads(rows)
            result = render(
                title=title,
                columns=parsed_columns,
                rows=parsed_rows,
                mode=mode,
            )
            if not result:
                return json.dumps({"result": ""})
  
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
register function · python · L6-L90 (85 LOC)
mcp_tools/identity_tools.py
def register(mcp, state):
    """Register identity linking tools with the MCP server."""

    @mcp.tool()
    async def link_identity(
        canonical_name: str,
        provider: str,
        provider_id: str,
        display_name: str = "",
        email: str = "",
    ) -> str:
        """Link a provider identity to a canonical person name.

        Maps a person's account on a specific provider (iMessage, email, Teams, etc.)
        to their canonical name, enabling cross-channel identity resolution.

        Args:
            canonical_name: The person's canonical name (e.g. "Jane Smith")
            provider: Provider name: imessage, email, m365_teams, m365_email, slack, jira, confluence
            provider_id: Unique ID on the provider (phone number, email, user ID, etc.)
            display_name: Optional display name on the provider
            email: Optional email address associated with this identity
        """
        try:
            result = state.memory_store.link_i
register function · python · L6-L176 (171 LOC)
mcp_tools/imessage_tools.py
def register(mcp, state):
    """Register iMessage tools with the MCP server."""

    @mcp.tool()
    async def get_imessages(
        minutes: int = 60,
        limit: int = 25,
        include_from_me: bool = True,
        conversation: str = "",
    ) -> str:
        """Get recent iMessages from Messages.app chat history.

        Args:
            minutes: Lookback window in minutes (default: 60)
            limit: Maximum number of messages to return (default: 25, max: 200)
            include_from_me: Include your own sent messages (default: True)
            conversation: Optional sender/chat identifier filter
        """
        messages_store = state.messages_store
        try:
            messages = messages_store.get_messages(
                minutes=minutes,
                limit=limit,
                include_from_me=include_from_me,
                conversation=conversation,
            )
            return json.dumps({"results": messages})
        except Exception as e:
_format_delegations function · python · L11-L24 (14 LOC)
mcp_tools/lifecycle_tools.py
def _format_delegations(results):
    """Add formatted table to delegation results."""
    try:
        from formatter.data_helpers import delegations_to_table_data
        from formatter.tables import render
        items = results.get("results", [])
        if not items:
            results["formatted"] = ""
            return results
        columns, rows = delegations_to_table_data(items)
        results["formatted"] = render(columns=columns, rows=rows, title="Delegations", mode="plain")
    except Exception:
        results["formatted"] = ""
    return results
‹ prevpage 5 / 11next ›