← back to jrichyrich__chief-of-staff

Function bodies 522 total

All specs Real LLM only Function bodies
BaseExpertAgent.__init__ method · python · L22-L43 (22 LOC)
agents/base.py
    def __init__(
        self,
        config: AgentConfig,
        memory_store: MemoryStore,
        document_store: DocumentStore,
        client: Optional[anthropic.AsyncAnthropic] = None,
        calendar_store=None,
        reminder_store=None,
        notifier=None,
        mail_store=None,
        hook_registry=None,
    ):
        self.config = config
        self.name = config.name
        self.memory_store = memory_store
        self.document_store = document_store
        self.calendar_store = calendar_store
        self.reminder_store = reminder_store
        self.notifier = notifier
        self.mail_store = mail_store
        self.hook_registry = hook_registry
        self.client = client or anthropic.AsyncAnthropic(api_key=app_config.ANTHROPIC_API_KEY)
BaseExpertAgent.build_system_prompt method · python · L45-L73 (29 LOC)
agents/base.py
    def build_system_prompt(self) -> str:
        prompt = self.config.system_prompt

        # Inject runtime context: agent identity and current date
        today = date.today().isoformat()
        prompt += f"\n\n## Runtime Context\n- Agent name: {self.name}\n- Today's date: {today}"

        try:
            memories = self.memory_store.get_agent_memories(self.name)
        except Exception:
            memories = []
        if memories:
            lines = ["\n\n## Agent Memory (retained from previous runs)"]
            for m in memories:
                lines.append(f"- {m.key}: {m.value}")
            prompt += "\n".join(lines)

        # Inject shared namespace memories
        for ns in getattr(self.config, "namespaces", []):
            try:
                shared = self.memory_store.get_shared_memories(ns)
            except Exception:
                shared = []
            if shared:
                lines = [f"\n\n## Shared Memory [{ns}]"]
                for m in shared
BaseExpertAgent.execute method · python · L78-L134 (57 LOC)
agents/base.py
    async def execute(self, task: str) -> str:
        messages = [{"role": "user", "content": task}]
        tools = self.get_tools()
        loop_detector = LoopDetector()

        for _round in range(MAX_TOOL_ROUNDS):
            response = await self._call_api(messages, tools)

            # Check if the model wants to use a tool
            if response.stop_reason == "tool_use":
                # Process tool calls
                assistant_content = response.content
                messages.append({"role": "assistant", "content": assistant_content})

                tool_results = []
                should_break = False
                for block in assistant_content:
                    if block.type == "tool_use":
                        signal = loop_detector.record(block.name, block.input)
                        if signal == "break":
                            should_break = True
                            tool_results.append({
                                "type": "tool_
BaseExpertAgent._call_api method · python · L137-L150 (14 LOC)
agents/base.py
    async def _call_api(self, messages: list, tools: list) -> Any:
        model_id = app_config.MODEL_TIERS.get(
            self.config.model,
            app_config.MODEL_TIERS[app_config.DEFAULT_MODEL_TIER],
        )
        kwargs = {
            "model": model_id,
            "max_tokens": self.config.max_tokens,
            "system": self.build_system_prompt(),
            "messages": messages,
        }
        if tools:
            kwargs["tools"] = tools
        return await self.client.messages.create(**kwargs)
BaseExpertAgent._fire_hooks method · python · L152-L159 (8 LOC)
agents/base.py
    def _fire_hooks(self, event_type: str, context: dict) -> list:
        """Fire hooks if a hook_registry is available. Error-isolated."""
        if self.hook_registry is None:
            return []
        try:
            return self.hook_registry.fire_hooks(event_type, context)
        except Exception:
            return []
BaseExpertAgent._handle_tool_call method · python · L161-L205 (45 LOC)
agents/base.py
    def _handle_tool_call(self, tool_name: str, tool_input: dict) -> Any:
        from hooks.registry import build_tool_context, extract_transformed_args

        # Fire before_tool_call hooks
        before_ctx = build_tool_context(
            tool_name=tool_name,
            tool_args=tool_input,
            agent_name=self.name,
        )
        hook_results = self._fire_hooks("before_tool_call", before_ctx)

        # Apply arg transformations from before_tool_call hooks
        transformed = extract_transformed_args(hook_results)
        if transformed is not None:
            tool_input = transformed

        # Enforce capability boundaries
        allowed_tools = {t["name"] for t in self.get_tools()}
        if tool_name not in allowed_tools:
            result = {"error": f"Tool '{tool_name}' not permitted for agent '{self.name}'"}
            after_ctx = build_tool_context(
                tool_name=tool_name,
                tool_args=tool_input,
                agent_name=
BaseExpertAgent._dispatch_tool method · python · L207-L315 (109 LOC)
agents/base.py
    def _dispatch_tool(self, tool_name: str, tool_input: dict) -> Any:
        """Route a tool call to the appropriate handler. Returns the result."""
        if tool_name == "query_memory":
            return execute_query_memory(
                self.memory_store, tool_input["query"], tool_input.get("category")
            )

        elif tool_name == "store_memory":
            return execute_store_memory(
                self.memory_store,
                tool_input["category"],
                tool_input["key"],
                tool_input["value"],
                source=self.name,
            )

        elif tool_name == "search_documents":
            return execute_search_documents(
                self.document_store, tool_input["query"], tool_input.get("top_k", 5)
            )

        elif tool_name == "create_decision":
            return self._handle_create_decision(tool_input)

        elif tool_name == "search_decisions":
            return self._handle_search_decisions
Want this analysis on your repo? https://repobility.com/scan/
BaseExpertAgent._handle_create_decision method · python · L317-L329 (13 LOC)
agents/base.py
    def _handle_create_decision(self, tool_input: dict) -> Any:
        return lifecycle_tools.create_decision(
            self.memory_store,
            title=tool_input["title"],
            description=tool_input.get("description", ""),
            context=tool_input.get("context", ""),
            decided_by=tool_input.get("decided_by", ""),
            owner=tool_input.get("owner", ""),
            status=tool_input.get("status", "pending_execution"),
            follow_up_date=tool_input.get("follow_up_date", ""),
            tags=tool_input.get("tags", ""),
            source=tool_input.get("source", self.name),
        )
BaseExpertAgent._handle_search_decisions method · python · L331-L336 (6 LOC)
agents/base.py
    def _handle_search_decisions(self, tool_input: dict) -> Any:
        return lifecycle_tools.search_decisions(
            self.memory_store,
            query=tool_input.get("query", ""),
            status=tool_input.get("status", ""),
        )
BaseExpertAgent._handle_update_decision method · python · L338-L344 (7 LOC)
agents/base.py
    def _handle_update_decision(self, tool_input: dict) -> Any:
        return lifecycle_tools.update_decision(
            self.memory_store,
            decision_id=tool_input["decision_id"],
            status=tool_input.get("status", ""),
            notes=tool_input.get("notes", ""),
        )
BaseExpertAgent._handle_create_delegation method · python · L355-L364 (10 LOC)
agents/base.py
    def _handle_create_delegation(self, tool_input: dict) -> Any:
        return lifecycle_tools.create_delegation(
            self.memory_store,
            task=tool_input["task"],
            delegated_to=tool_input["delegated_to"],
            description=tool_input.get("description", ""),
            due_date=tool_input.get("due_date", ""),
            priority=tool_input.get("priority", "medium"),
            source=tool_input.get("source", self.name),
        )
BaseExpertAgent._handle_list_delegations method · python · L366-L371 (6 LOC)
agents/base.py
    def _handle_list_delegations(self, tool_input: dict) -> Any:
        return lifecycle_tools.list_delegations(
            self.memory_store,
            status=tool_input.get("status", ""),
            delegated_to=tool_input.get("delegated_to", ""),
        )
BaseExpertAgent._handle_update_delegation method · python · L373-L379 (7 LOC)
agents/base.py
    def _handle_update_delegation(self, tool_input: dict) -> Any:
        return lifecycle_tools.update_delegation(
            self.memory_store,
            delegation_id=tool_input["delegation_id"],
            status=tool_input.get("status", ""),
            notes=tool_input.get("notes", ""),
        )
BaseExpertAgent._handle_create_alert_rule method · python · L390-L398 (9 LOC)
agents/base.py
    def _handle_create_alert_rule(self, tool_input: dict) -> Any:
        return lifecycle_tools.create_alert_rule(
            self.memory_store,
            name=tool_input["name"],
            alert_type=tool_input["alert_type"],
            description=tool_input.get("description", ""),
            condition=tool_input.get("condition", ""),
            enabled=tool_input.get("enabled", True),
        )
BaseExpertAgent._handle_calendar_get_events method · python · L415-L428 (14 LOC)
agents/base.py
    def _handle_calendar_get_events(self, tool_input: dict) -> Any:
        if self.calendar_store is None:
            return {"error": "Calendar not available (macOS only)"}
        from datetime import datetime
        start_dt = datetime.fromisoformat(tool_input["start_date"])
        end_dt = datetime.fromisoformat(tool_input["end_date"])
        calendar_names = [tool_input["calendar_name"]] if tool_input.get("calendar_name") else None
        return self.calendar_store.get_events(
            start_dt,
            end_dt,
            calendar_names=calendar_names,
            provider_preference=tool_input.get("provider_preference", "auto"),
            source_filter=tool_input.get("source_filter", ""),
        )
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
BaseExpertAgent._handle_calendar_search method · python · L430-L443 (14 LOC)
agents/base.py
    def _handle_calendar_search(self, tool_input: dict) -> Any:
        if self.calendar_store is None:
            return {"error": "Calendar not available (macOS only)"}
        from datetime import datetime, timedelta
        now = datetime.now()
        start_dt = datetime.fromisoformat(tool_input["start_date"]) if tool_input.get("start_date") else now - timedelta(days=30)
        end_dt = datetime.fromisoformat(tool_input["end_date"]) if tool_input.get("end_date") else now + timedelta(days=30)
        return self.calendar_store.search_events(
            tool_input["query"],
            start_dt,
            end_dt,
            provider_preference=tool_input.get("provider_preference", "auto"),
            source_filter=tool_input.get("source_filter", ""),
        )
BaseExpertAgent._handle_reminder_list method · python · L445-L451 (7 LOC)
agents/base.py
    def _handle_reminder_list(self, tool_input: dict) -> Any:
        if self.reminder_store is None:
            return {"error": "Reminders not available (macOS only)"}
        return self.reminder_store.list_reminders(
            list_name=tool_input.get("list_name"),
            completed=tool_input.get("completed"),
        )
BaseExpertAgent._handle_reminder_search method · python · L453-L459 (7 LOC)
agents/base.py
    def _handle_reminder_search(self, tool_input: dict) -> Any:
        if self.reminder_store is None:
            return {"error": "Reminders not available (macOS only)"}
        return self.reminder_store.search_reminders(
            query=tool_input["query"],
            include_completed=tool_input.get("include_completed", False),
        )
BaseExpertAgent._handle_reminder_create method · python · L461-L470 (10 LOC)
agents/base.py
    def _handle_reminder_create(self, tool_input: dict) -> Any:
        if self.reminder_store is None:
            return {"error": "Reminders not available (macOS only)"}
        return self.reminder_store.create_reminder(
            title=tool_input["title"],
            list_name=tool_input.get("list_name"),
            due_date=tool_input.get("due_date"),
            priority=tool_input.get("priority"),
            notes=tool_input.get("notes"),
        )
BaseExpertAgent._handle_send_notification method · python · L477-L485 (9 LOC)
agents/base.py
    def _handle_send_notification(self, tool_input: dict) -> Any:
        if self.notifier is None:
            return {"error": "Notifications not available (macOS only)"}
        return self.notifier.send(
            title=tool_input["title"],
            message=tool_input["message"],
            subtitle=tool_input.get("subtitle"),
            sound=tool_input.get("sound", "default"),
        )
BaseExpertAgent._handle_mail_get_messages method · python · L487-L494 (8 LOC)
agents/base.py
    def _handle_mail_get_messages(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        return self.mail_store.get_messages(
            mailbox=tool_input.get("mailbox", "INBOX"),
            account=tool_input.get("account", ""),
            limit=tool_input.get("limit", 25),
        )
BaseExpertAgent._handle_mail_search method · python · L501-L509 (9 LOC)
agents/base.py
    def _handle_mail_search(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        return self.mail_store.search_messages(
            query=tool_input["query"],
            mailbox=tool_input.get("mailbox", "INBOX"),
            account=tool_input.get("account", ""),
            limit=tool_input.get("limit", 25),
        )
BaseExpertAgent._handle_mail_get_unread_count method · python · L511-L522 (12 LOC)
agents/base.py
    def _handle_mail_get_unread_count(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        mailbox = tool_input.get("mailbox", "INBOX")
        account = tool_input.get("account", "")
        mailboxes = self.mail_store.list_mailboxes()
        for mb in mailboxes:
            if isinstance(mb, dict) and mb.get("name") == mailbox:
                if account and mb.get("account") != account:
                    continue
                return {"mailbox": mailbox, "unread_count": mb.get("unread_count", 0)}
        return {"mailbox": mailbox, "unread_count": 0}
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
BaseExpertAgent._handle_mail_send method · python · L524-L537 (14 LOC)
agents/base.py
    def _handle_mail_send(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        to_list = [a.strip() for a in tool_input["to"].split(",") if a.strip()]
        cc_list = [a.strip() for a in tool_input.get("cc", "").split(",") if a.strip()] or None
        bcc_list = [a.strip() for a in tool_input.get("bcc", "").split(",") if a.strip()] or None
        return self.mail_store.send_message(
            to=to_list,
            subject=tool_input["subject"],
            body=tool_input["body"],
            cc=cc_list,
            bcc=bcc_list,
            confirm_send=False,  # Agents must never auto-send; only MCP server (human-in-loop) can confirm
        )
BaseExpertAgent._handle_mail_mark_read method · python · L539-L545 (7 LOC)
agents/base.py
    def _handle_mail_mark_read(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        return self.mail_store.mark_read(
            message_id=tool_input["message_id"],
            read=tool_input.get("read", True),
        )
BaseExpertAgent._handle_mail_mark_flagged method · python · L547-L553 (7 LOC)
agents/base.py
    def _handle_mail_mark_flagged(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        return self.mail_store.mark_flagged(
            message_id=tool_input["message_id"],
            flagged=tool_input.get("flagged", True),
        )
BaseExpertAgent._handle_mail_move_message method · python · L555-L562 (8 LOC)
agents/base.py
    def _handle_mail_move_message(self, tool_input: dict) -> Any:
        if self.mail_store is None:
            return {"error": "Mail not available (macOS only)"}
        return self.mail_store.move_message(
            message_id=tool_input["message_id"],
            target_mailbox=tool_input["target_mailbox"],
            target_account=tool_input.get("target_account", ""),
        )
AgentFactory.create_agent method · python · L35-L63 (29 LOC)
agents/factory.py
    def create_agent(self, description: str) -> AgentConfig:
        client = anthropic.Anthropic(api_key=app_config.ANTHROPIC_API_KEY)
        response = client.messages.create(
            model=app_config.MODEL_TIERS["haiku"],
            max_tokens=1024,
            system=AGENT_CREATION_PROMPT,
            messages=[{"role": "user", "content": f"I need an agent for: {description}"}],
        )

        raw = response.content[0].text
        data = json.loads(raw)
        capabilities = validate_capabilities(data.get("capabilities", ["memory_read"]))
        # Filter out restricted capabilities and cap count for auto-generated agents
        capabilities = [c for c in capabilities if c not in RESTRICTED_CAPABILITIES]
        capabilities = capabilities[:MAX_AUTO_CAPABILITIES]

        config = AgentConfig(
            name=data["name"],
            description=data["description"],
            system_prompt=data["system_prompt"],
            capabilities=capabilities,
            te
LoopDetector.record method · python · L27-L45 (19 LOC)
agents/loop_detector.py
    def record(self, tool_name: str, tool_args: dict) -> str:
        """Record a tool call and return 'ok', 'warning', or 'break'."""
        key = self._make_key(tool_name, tool_args)
        self._counts[key] += 1
        self._history.append(key)

        count = self._counts[key]
        if count >= self.break_threshold:
            return "break"
        if count >= self.warn_threshold:
            return "warning"

        # Check A-B-A-B alternation (need at least 4 entries)
        if len(self._history) >= 4:
            h = self._history
            if (h[-1] == h[-3] and h[-2] == h[-4] and h[-1] != h[-2]):
                return "warning"

        return "ok"
AgentRegistry._ensure_cache method · python · L36-L45 (10 LOC)
agents/registry.py
    def _ensure_cache(self) -> None:
        """Load all configs into cache on first access."""
        if self._cache_loaded:
            return
        self._cache.clear()
        for path in sorted(self.configs_dir.glob("*.yaml")):
            config = self._load_yaml(path)
            if config:
                self._cache[config.name] = config
        self._cache_loaded = True
AgentRegistry._validate_name method · python · L56-L61 (6 LOC)
agents/registry.py
    def _validate_name(name: str) -> None:
        if not VALID_AGENT_NAME.match(name):
            raise ValueError(
                f"Invalid agent name '{name}': must be lowercase alphanumeric, "
                "underscores, or hyphens, starting with a letter or digit"
            )
Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
AgentRegistry.save_agent method · python · L68-L90 (23 LOC)
agents/registry.py
    def save_agent(self, config: AgentConfig) -> Path:
        self._validate_name(config.name)
        normalized_capabilities = validate_capabilities(config.capabilities)
        config.capabilities = normalized_capabilities
        path = self.configs_dir / f"{config.name}.yaml"
        data = {
            "name": config.name,
            "description": config.description,
            "system_prompt": config.system_prompt,
            "capabilities": normalized_capabilities,
            "temperature": config.temperature,
            "max_tokens": config.max_tokens,
            "model": config.model,
        }
        if config.namespaces:
            data["namespaces"] = config.namespaces
        if config.created_by:
            data["created_by"] = config.created_by
        if config.created_at:
            data["created_at"] = config.created_at
        path.write_text(yaml.dump(data, default_flow_style=False))
        self._invalidate_cache()
        return path
AgentRegistry._load_yaml method · python · L97-L114 (18 LOC)
agents/registry.py
    def _load_yaml(self, path: Path) -> Optional[AgentConfig]:
        try:
            data = yaml.safe_load(path.read_text())
            capabilities = validate_capabilities(data.get("capabilities", []))
            return AgentConfig(
                name=data["name"],
                description=data.get("description", ""),
                system_prompt=data.get("system_prompt", ""),
                capabilities=capabilities,
                namespaces=data.get("namespaces", []),
                temperature=data.get("temperature", 0.3),
                max_tokens=data.get("max_tokens", 4096),
                model=data.get("model", "sonnet"),
                created_by=data.get("created_by"),
                created_at=data.get("created_at"),
            )
        except (yaml.YAMLError, KeyError, ValueError):
            return None
classify_complexity function · python · L30-L62 (33 LOC)
agents/triage.py
def classify_complexity(
    task_text: str,
    agent_config: AgentConfig,
    client: Optional[anthropic.Anthropic] = None,
) -> str:
    """Classify task complexity using a Haiku pre-call.

    Returns 'simple', 'standard', or 'complex'.
    On any error, returns 'standard' (safe fallback).
    """
    if client is None:
        client = anthropic.Anthropic(api_key=app_config.ANTHROPIC_API_KEY)

    prompt = _TRIAGE_PROMPT.format(
        name=agent_config.name,
        description=agent_config.description,
        task=task_text[:1000],
    )

    try:
        response = client.messages.create(
            model=app_config.MODEL_TIERS["haiku"],
            max_tokens=10,
            messages=[{"role": "user", "content": prompt}],
        )
        text = response.content[0].text.strip().lower()
        if text in _VALID_CLASSIFICATIONS:
            return text
        logger.warning("Triage returned unexpected value: %r, defaulting to standard", text)
        return "standard"
    
classify_and_resolve function · python · L69-L93 (25 LOC)
agents/triage.py
def classify_and_resolve(
    agent_config: AgentConfig,
    task_text: str,
    client: Optional[anthropic.Anthropic] = None,
) -> AgentConfig:
    """Classify task complexity and return a (possibly downgraded) config.

    Skips triage for haiku agents (already cheapest) and opus agents (reserved).
    Returns a copy with model overridden if downgraded; original config is never mutated.
    On error, returns the original config unchanged.
    """
    if agent_config.model in _SKIP_TRIAGE_TIERS:
        return agent_config

    classification = classify_complexity(task_text, agent_config, client=client)

    if classification == "simple":
        logger.info(
            "Triage: agent=%s classification=simple, downgrading from %s to haiku",
            agent_config.name,
            agent_config.model,
        )
        return replace(agent_config, model="haiku")

    return agent_config
_event_to_dict function · python · L34-L58 (25 LOC)
apple_calendar/eventkit.py
def _event_to_dict(event, calendar_name: str) -> dict:
    """Convert an EKEvent to a plain dict."""
    attendees = []
    if event.attendees():
        for a in event.attendees():
            attendees.append({
                "name": str(a.name()) if a.name() else None,
                "email": str(a.URL().resourceSpecifier()) if a.URL() else None,
                "status": int(a.participantStatus()),
            })

    start = event.startDate()
    end = event.endDate()

    return {
        "uid": str(event.calendarItemExternalIdentifier()),
        "title": str(event.title()) if event.title() else "",
        "start": datetime.fromtimestamp(start.timeIntervalSince1970()).isoformat() if start else None,
        "end": datetime.fromtimestamp(end.timeIntervalSince1970()).isoformat() if end else None,
        "location": str(event.location()) if event.location() else None,
        "notes": str(event.notes()) if event.notes() else None,
        "attendees": attendees,
        "calend
CalendarStore._ensure_store method · python · L76-L93 (18 LOC)
apple_calendar/eventkit.py
    def _ensure_store(self) -> Optional[dict]:
        """Lazily create EKEventStore and request access.

        Returns None on success, or an error dict if unavailable.
        """
        if not _EVENTKIT_AVAILABLE:
            return _PLATFORM_ERROR

        if self._store is None:
            self._store = EventKit.EKEventStore.alloc().init()

        if self._access_granted is None:
            self._access_granted = self._request_access()

        if not self._access_granted:
            return _PERMISSION_ERROR

        return None
CalendarStore._request_access method · python · L95-L111 (17 LOC)
apple_calendar/eventkit.py
    def _request_access(self) -> bool:
        """Request calendar access synchronously.

        Uses the completion-handler API and waits on a simple flag.
        """
        import threading

        granted_flag = threading.Event()
        result = {"granted": False}

        def handler(granted, error):
            result["granted"] = granted
            granted_flag.set()

        self._store.requestFullAccessToEventsWithCompletion_(handler)
        granted_flag.wait(timeout=30)
        return result["granted"]
CalendarStore._get_calendar_by_name method · python · L113-L133 (21 LOC)
apple_calendar/eventkit.py
    def _get_calendar_by_name(self, name: str, source: Optional[str] = None):
        """Find an EKCalendar by display title (and optionally source), or None.

        Resolves calendar aliases from config before matching.  When two
        calendars share the same display name (e.g. iCloud "Calendar" and
        Exchange "Calendar"), the alias provides the source to disambiguate.
        """
        from config import CALENDAR_ALIASES

        alias = CALENDAR_ALIASES.get((name or "").strip().lower())
        if alias:
            name = alias["name"]
            source = source or alias.get("source")

        for cal in self._store.calendarsForEntityType_(0):  # 0 = Event
            if cal.title() == name:
                if source and cal.source():
                    if str(cal.source().title()).lower() != source.lower():
                        continue
                return cal
        return None
Want this analysis on your repo? https://repobility.com/scan/
CalendarStore._find_event_by_uid method · python · L135-L156 (22 LOC)
apple_calendar/eventkit.py
    def _find_event_by_uid(self, uid: str, calendar_name: Optional[str] = None):
        """Find a single EKEvent by its external identifier."""
        event = self._store.calendarItemWithIdentifier_(uid)
        if event is not None:
            return event
        # Fallback: search recent range for the UID
        from datetime import timedelta
        now = datetime.now()
        start = _ns_date(now - timedelta(days=365 * 2))
        end = _ns_date(now + timedelta(days=365 * 2))
        calendars = None
        if calendar_name:
            cal = self._get_calendar_by_name(calendar_name)
            if cal:
                calendars = [cal]
        predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
            start, end, calendars
        )
        for ev in self._store.eventsMatchingPredicate_(predicate):
            if str(ev.calendarItemExternalIdentifier()) == uid:
                return ev
        return None
CalendarStore.list_calendars method · python · L162-L193 (32 LOC)
apple_calendar/eventkit.py
    def list_calendars(self) -> list[dict]:
        """Return all calendars with name, type, source, and color."""
        err = self._ensure_store()
        if err:
            return [err]

        try:
            calendars = self._store.calendarsForEntityType_(0)  # 0 = Event
            result = []
            for cal in calendars:
                color = cal.color()
                color_hex = None
                if color:
                    r = int(color.redComponent() * 255)
                    g = int(color.greenComponent() * 255)
                    b = int(color.blueComponent() * 255)
                    color_hex = f"#{r:02x}{g:02x}{b:02x}"

                cal_type_int = cal.type()
                cal_type_map = {0: "local", 1: "calDAV", 2: "exchange", 3: "subscription", 4: "birthday"}
                cal_type = cal_type_map.get(cal_type_int, f"unknown({cal_type_int})")

                result.append({
                    "name": str(cal.title()),
                    "ty
CalendarStore.get_events method · python · L195-L229 (35 LOC)
apple_calendar/eventkit.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
    ) -> list[dict]:
        """Return events in the given date range."""
        err = self._ensure_store()
        if err:
            return [err]

        try:
            calendars = None
            if calendar_names:
                calendars = []
                for name in calendar_names:
                    cal = self._get_calendar_by_name(name)
                    if cal:
                        calendars.append(cal)
                if not calendars:
                    return [{"error": f"No matching calendars found for: {calendar_names}"}]

            predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
                _ns_date(start_dt), _ns_date(end_dt), calendars
            )
            events = self._store.eventsMatchingPredicate_(predicate)
            if events is None:
                return []
        
CalendarStore.create_event method · python · L231-L274 (44 LOC)
apple_calendar/eventkit.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
    ) -> dict:
        """Create a calendar event and return its dict representation."""
        err = self._ensure_store()
        if err:
            return err

        try:
            event = EventKit.EKEvent.eventWithEventStore_(self._store)
            event.setTitle_(title)
            event.setStartDate_(_ns_date(start_dt))
            event.setEndDate_(_ns_date(end_dt))
            event.setAllDay_(is_all_day)

            if location:
                event.setLocation_(location)
            if notes:
                event.setNotes_(notes)

            if calendar_name:
                cal = self._get_calendar_by_name(calendar_name)
                if cal:
                    event.setCalendar_(cal)
               
CalendarStore.update_event method · python · L276-L308 (33 LOC)
apple_calendar/eventkit.py
    def update_event(self, event_uid: str, calendar_name: Optional[str] = None, **kwargs) -> dict:
        """Update an existing event by UID. Supported kwargs: title, start_dt,
        end_dt, location, notes, is_all_day."""
        err = self._ensure_store()
        if err:
            return err

        try:
            event = self._find_event_by_uid(event_uid, calendar_name)
            if event is None:
                return {"error": f"Event not found: {event_uid}"}

            if "title" in kwargs:
                event.setTitle_(kwargs["title"])
            if "start_dt" in kwargs:
                event.setStartDate_(_ns_date(kwargs["start_dt"]))
            if "end_dt" in kwargs:
                event.setEndDate_(_ns_date(kwargs["end_dt"]))
            if "location" in kwargs:
                event.setLocation_(kwargs["location"])
            if "notes" in kwargs:
                event.setNotes_(kwargs["notes"])
            if "is_all_day" in kwargs:
                event.
CalendarStore.delete_event method · python · L310-L327 (18 LOC)
apple_calendar/eventkit.py
    def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
        """Delete an event by UID. Returns status dict on success, or an error dict."""
        err = self._ensure_store()
        if err:
            return err

        try:
            event = self._find_event_by_uid(event_uid, calendar_name)
            if event is None:
                return {"error": f"Event not found: {event_uid}"}

            success, error = self._store.removeEvent_span_error_(event, 0, None)
            if not success:
                return {"error": f"Failed to delete event: {error}"}
            return {"status": "deleted", "event_uid": event_uid}
        except (AttributeError, TypeError, RuntimeError) as e:
            logger.error("PyObjC error deleting event: %s", e)
            return {"error": f"Failed to delete event: {e}"}
CalendarStore.search_events method · python · L329-L357 (29 LOC)
apple_calendar/eventkit.py
    def search_events(
        self,
        query: str,
        start_dt: datetime,
        end_dt: datetime,
    ) -> list[dict]:
        """Search events by title text within a date range."""
        err = self._ensure_store()
        if err:
            return [err]

        try:
            predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
                _ns_date(start_dt), _ns_date(end_dt), None
            )
            events = self._store.eventsMatchingPredicate_(predicate)
            if events is None:
                return []

            query_lower = query.lower()
            results = []
            for ev in events:
                title = str(ev.title()) if ev.title() else ""
                if query_lower in title.lower():
                    results.append(_event_to_dict(ev, str(ev.calendar().title())))
            return results
        except (AttributeError, TypeError, RuntimeError) as e:
            logger.error("PyObjC error searching
_escape_osascript function · python · L19-L27 (9 LOC)
apple_mail/mail.py
def _escape_osascript(text: str) -> str:
    """Escape text for safe use in AppleScript strings."""
    return (
        text.replace("\\", "\\\\")
        .replace('"', '\\"')
        .replace("\n", "\\n")
        .replace("\r", "\\r")
        .replace("\t", "\\t")
    )
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_run_applescript function · python · L30-L45 (16 LOC)
apple_mail/mail.py
def _run_applescript(script: str, timeout: int = _DEFAULT_TIMEOUT) -> dict:
    """Execute AppleScript and return {'output': ...} or {'error': ...}."""
    if not _IS_MACOS:
        return _PLATFORM_ERROR
    try:
        result = subprocess.run(
            ["osascript", "-e", script],
            capture_output=True, text=True, timeout=timeout, check=True,
        )
        return {"output": result.stdout.strip()}
    except subprocess.TimeoutExpired:
        return {"error": "Mail operation timed out"}
    except subprocess.CalledProcessError as e:
        return {"error": f"osascript failed: {e.stderr.strip()}"}
    except FileNotFoundError:
        return {"error": "osascript not found — not running on macOS?"}
MailStore.list_mailboxes method · python · L65-L94 (30 LOC)
apple_mail/mail.py
    def list_mailboxes(self) -> list[dict]:
        """List all mailboxes across all accounts with unread counts."""
        script = '''
tell application "Mail"
    set output to ""
    repeat with acct in accounts
        set acctName to name of acct
        repeat with mb in mailboxes of acct
            set mbName to name of mb
            set unread to unread count of mb
            set output to output & mbName & "|||" & acctName & "|||" & (unread as text) & return & "~~~RECORD~~~" & return
        end repeat
    end repeat
    return output
end tell
'''
        result = _run_applescript(script)
        if "error" in result:
            return [result]
        records = _parse_records(result["output"])
        mailboxes = []
        for rec in records:
            fields = _parse_fields(rec)
            if len(fields) >= 3:
                mailboxes.append({
                    "name": fields[0],
                    "account": fields[1],
                    "unread_count": int(fi
MailStore.get_messages method · python · L96-L144 (49 LOC)
apple_mail/mail.py
    def get_messages(self, mailbox: str = "INBOX", account: str = "", limit: int = 25) -> list[dict]:
        """Get recent messages (headers only) from a mailbox. Limit capped at 100."""
        limit = min(max(1, limit), 100)
        mailbox_esc = _escape_osascript(mailbox)
        if account:
            account_esc = _escape_osascript(account)
            mailbox_ref = f'mailbox "{mailbox_esc}" of account "{account_esc}"'
        else:
            mailbox_ref = f'mailbox "{mailbox_esc}" of account 1'
        script = f'''
tell application "Mail"
    set mb to {mailbox_ref}
    set msgCount to count of messages of mb
    if msgCount is 0 then return ""
    set maxMsg to msgCount
    if maxMsg > {limit} then set maxMsg to {limit}
    set output to ""
    repeat with i from 1 to maxMsg
        set msg to message i of mb
        set mid to message id of msg
        set subj to subject of msg
        set sndr to sender of msg
        set dt to date sent of msg as text
        set isRead
page 1 / 11next ›