← back to jrichyrich__chief-of-staff

Function bodies 522 total

All specs Real LLM only Function bodies
MailStore.get_message method · python · L146-L208 (63 LOC)
apple_mail/mail.py
    def get_message(self, message_id: str) -> dict:
        """Get full message content by message ID, including decoded body."""
        mid_esc = _escape_osascript(message_id)
        script = f'''
tell application "Mail"
    set foundMsg to missing value
    repeat with acct in accounts
        repeat with mb in mailboxes of acct
            try
                set matchMsgs to (messages of mb whose message id is "{mid_esc}")
                if (count of matchMsgs) > 0 then
                    set foundMsg to item 1 of matchMsgs
                    exit repeat
                end if
            end try
        end repeat
        if foundMsg is not missing value then exit repeat
    end repeat
    if foundMsg is missing value then return "ERROR: Message not found"
    set subj to subject of foundMsg
    set sndr to sender of foundMsg
    set dt to date sent of foundMsg as text
    set isRead to (read status of foundMsg)
    set isFlagged to (flagged status of foundMsg)
    set toList
MailStore.search_messages method · python · L210-L260 (51 LOC)
apple_mail/mail.py
    def search_messages(self, query: str, mailbox: str = "INBOX", account: str = "", limit: int = 25) -> list[dict]:
        """Search messages by subject or sender text in a mailbox."""
        limit = min(max(1, limit), 100)
        query_esc = _escape_osascript(query)
        mailbox_esc = _escape_osascript(mailbox)
        if account:
            account_esc = _escape_osascript(account)
            mailbox_ref = f'mailbox "{mailbox_esc}" of account "{account_esc}"'
        else:
            mailbox_ref = f'mailbox "{mailbox_esc}" of account 1'
        script = f'''
tell application "Mail"
    set mb to {mailbox_ref}
    set matchMsgs to (messages of mb whose subject contains "{query_esc}" or sender contains "{query_esc}")
    set msgCount to count of matchMsgs
    if msgCount is 0 then return ""
    set maxMsg to msgCount
    if maxMsg > {limit} then set maxMsg to {limit}
    set output to ""
    repeat with i from 1 to maxMsg
        set msg to item i of matchMsgs
        set mid 
MailStore.mark_read method · python · L262-L287 (26 LOC)
apple_mail/mail.py
    def mark_read(self, message_id: str, read: bool = True) -> dict:
        """Mark a message as read or unread."""
        mid_esc = _escape_osascript(message_id)
        read_val = "true" if read else "false"
        script = f'''
tell application "Mail"
    repeat with acct in accounts
        repeat with mb in mailboxes of acct
            try
                set matchMsgs to (messages of mb whose message id is "{mid_esc}")
                if (count of matchMsgs) > 0 then
                    set read status of item 1 of matchMsgs to {read_val}
                    return "OK"
                end if
            end try
        end repeat
    end repeat
    return "ERROR: Message not found"
end tell
'''
        result = _run_applescript(script)
        if "error" in result:
            return result
        if result["output"].startswith("ERROR:"):
            return {"error": result["output"]}
        return {"status": "ok", "message_id": message_id, "read": read}
MailStore.mark_flagged method · python · L289-L314 (26 LOC)
apple_mail/mail.py
    def mark_flagged(self, message_id: str, flagged: bool = True) -> dict:
        """Mark a message as flagged or unflagged."""
        mid_esc = _escape_osascript(message_id)
        flag_val = "true" if flagged else "false"
        script = f'''
tell application "Mail"
    repeat with acct in accounts
        repeat with mb in mailboxes of acct
            try
                set matchMsgs to (messages of mb whose message id is "{mid_esc}")
                if (count of matchMsgs) > 0 then
                    set flagged status of item 1 of matchMsgs to {flag_val}
                    return "OK"
                end if
            end try
        end repeat
    end repeat
    return "ERROR: Message not found"
end tell
'''
        result = _run_applescript(script)
        if "error" in result:
            return result
        if result["output"].startswith("ERROR:"):
            return {"error": result["output"]}
        return {"status": "ok", "message_id": message_id, "flagged": fla
MailStore.move_message method · python · L316-L347 (32 LOC)
apple_mail/mail.py
    def move_message(self, message_id: str, target_mailbox: str, target_account: str = "") -> dict:
        """Move a message to a different mailbox."""
        mid_esc = _escape_osascript(message_id)
        target_mb_esc = _escape_osascript(target_mailbox)
        if target_account:
            target_acct_esc = _escape_osascript(target_account)
            target_ref = f'mailbox "{target_mb_esc}" of account "{target_acct_esc}"'
        else:
            target_ref = f'mailbox "{target_mb_esc}" of account 1'
        script = f'''
tell application "Mail"
    set targetMb to {target_ref}
    repeat with acct in accounts
        repeat with mb in mailboxes of acct
            try
                set matchMsgs to (messages of mb whose message id is "{mid_esc}")
                if (count of matchMsgs) > 0 then
                    move item 1 of matchMsgs to targetMb
                    return "OK"
                end if
            end try
        end repeat
    end repeat
    return "ERR
MailStore.reply_message method · python · L349-L421 (73 LOC)
apple_mail/mail.py
    def reply_message(
        self,
        message_id: str,
        body: str,
        reply_all: bool = False,
        cc: Optional[list[str]] = None,
        bcc: Optional[list[str]] = None,
        confirm_send: bool = False,
    ) -> dict:
        """Reply to an existing message in-thread. confirm_send must be True."""
        if not confirm_send:
            return {"error": "confirm_send must be True. Please confirm with the user before sending."}

        mid_esc = _escape_osascript(message_id)
        body_esc = _escape_osascript(body)

        if reply_all:
            reply_flag = "with opening window and reply to all"
        else:
            reply_flag = "with opening window without reply to all"

        # Build additional recipient blocks for extra CC/BCC
        cc_block = ""
        if cc:
            for addr in cc:
                addr_esc = _escape_osascript(addr)
                cc_block += f'\nmake new cc recipient at end of cc recipients of replyMsg with proper
MailStore.send_message method · python · L423-L480 (58 LOC)
apple_mail/mail.py
    def send_message(
        self,
        to: list[str],
        subject: str,
        body: str,
        cc: Optional[list[str]] = None,
        bcc: Optional[list[str]] = None,
        confirm_send: bool = False,
    ) -> dict:
        """Compose and send an email. confirm_send must be True."""
        if not confirm_send:
            return {"error": "confirm_send must be True. Please confirm with the user before sending."}

        subject_esc = _escape_osascript(subject)
        body_esc = _escape_osascript(body)

        # Build recipient AppleScript blocks
        to_block = ""
        for addr in to:
            addr_esc = _escape_osascript(addr)
            to_block += f'\nmake new to recipient at end of to recipients with properties {{address:"{addr_esc}"}}'

        cc_block = ""
        if cc:
            for addr in cc:
                addr_esc = _escape_osascript(addr)
                cc_block += f'\nmake new cc recipient at end of cc recipients with properties {{addres
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
decode_attributed_body function · python · L14-L39 (26 LOC)
apple_messages/messages.py
def decode_attributed_body(blob: bytes) -> str | None:
    """Decode an attributedBody typedstream blob to plain text.

    Returns None if the blob cannot be decoded.
    """
    if not blob or b"NSString" not in blob:
        return None
    try:
        content = blob.split(b"NSString")[1][5:]  # Skip 5-byte preamble
        indicator = content[0]
        if indicator == 0x81:
            length = int.from_bytes(content[1:3], "little")
            start = 3
        elif indicator == 0x82:
            length = int.from_bytes(content[1:5], "little")
            start = 5
        elif indicator == 0x83:
            length = int.from_bytes(content[1:9], "little")
            start = 9
        else:
            length = indicator
            start = 1
        text = content[start : start + length].decode("utf-8", errors="replace")
        return text if text else None
    except (IndexError, ValueError):
        return None
MessageStore.__init__ method · python · L53-L71 (19 LOC)
apple_messages/messages.py
    def __init__(
        self,
        db_path: Path | None = None,
        communicate_script: Path | None = None,
        profile_db_path: Path | None = None,
    ):
        self.db_path = Path(db_path) if db_path else (Path.home() / "Library" / "Messages" / "chat.db")
        self.communicate_script = (
            Path(communicate_script)
            if communicate_script
            else (_project_root() / "scripts" / "communicate.sh")
        )
        self.profile_db_path = (
            Path(profile_db_path)
            if profile_db_path
            else (_project_root() / "data" / "imessage-thread-profiles.db")
        )
        self.profile_db_path.parent.mkdir(parents=True, exist_ok=True)
        self._init_profile_db()
MessageStore._open_chat_db method · python · L73-L79 (7 LOC)
apple_messages/messages.py
    def _open_chat_db(self) -> sqlite3.Connection:
        if not _IS_MACOS:
            raise RuntimeError(_PLATFORM_ERROR["error"])
        uri = f"file:{self.db_path}?mode=ro"
        conn = sqlite3.connect(uri, uri=True)
        conn.row_factory = sqlite3.Row
        return conn
MessageStore._init_profile_db method · python · L98-L121 (24 LOC)
apple_messages/messages.py
    def _init_profile_db(self) -> None:
        with self._open_profile_db() as conn:
            conn.executescript(
                """
                CREATE TABLE IF NOT EXISTS thread_profiles (
                    chat_identifier TEXT PRIMARY KEY,
                    display_name TEXT,
                    preferred_handle TEXT,
                    notes TEXT,
                    auto_reply_mode TEXT NOT NULL DEFAULT 'manual',
                    last_seen_at TEXT,
                    last_sender TEXT,
                    updated_at_utc TEXT NOT NULL
                );

                CREATE TABLE IF NOT EXISTS thread_members (
                    chat_identifier TEXT NOT NULL,
                    handle TEXT NOT NULL,
                    last_seen_at TEXT,
                    PRIMARY KEY(chat_identifier, handle)
                );
                """
            )
            conn.commit()
MessageStore._upsert_thread_observation method · python · L123-L168 (46 LOC)
apple_messages/messages.py
    def _upsert_thread_observation(self, chat_identifier: str, sender: str, date_local: str) -> None:
        chat_identifier = (chat_identifier or "").strip()
        sender = (sender or "").strip()
        date_local = (date_local or "").strip()
        if not chat_identifier:
            return
        now = self._utc_now_iso()
        with self._open_profile_db() as conn:
            conn.execute(
                """
                INSERT INTO thread_profiles(
                    chat_identifier, display_name, preferred_handle, notes, auto_reply_mode,
                    last_seen_at, last_sender, updated_at_utc
                )
                VALUES(?, NULL, ?, NULL, 'manual', ?, ?, ?)
                ON CONFLICT(chat_identifier) DO UPDATE SET
                    preferred_handle = CASE
                        WHEN COALESCE(thread_profiles.preferred_handle, '') = '' THEN excluded.preferred_handle
                        ELSE thread_profiles.preferred_handle
                    
MessageStore._record_observations method · python · L170-L180 (11 LOC)
apple_messages/messages.py
    def _record_observations(self, rows: list[dict]) -> None:
        seen: set[tuple[str, str, str]] = set()
        for row in rows:
            chat_identifier = str(row.get("chat_identifier", "")).strip()
            sender = str(row.get("sender", "")).strip()
            date_local = str(row.get("date_local", "")).strip()
            key = (chat_identifier, sender, date_local)
            if key in seen:
                continue
            seen.add(key)
            self._upsert_thread_observation(chat_identifier, sender, date_local)
MessageStore._load_profiles method · python · L182-L223 (42 LOC)
apple_messages/messages.py
    def _load_profiles(self, chat_ids: list[str]) -> dict[str, dict]:
        chat_ids = [c for c in chat_ids if c]
        if not chat_ids:
            return {}
        placeholders = ",".join("?" for _ in chat_ids)
        with self._open_profile_db() as conn:
            rows = conn.execute(
                f"""
                SELECT
                    p.chat_identifier,
                    COALESCE(p.display_name, '') AS display_name,
                    COALESCE(p.preferred_handle, '') AS preferred_handle,
                    COALESCE(p.notes, '') AS notes,
                    COALESCE(p.auto_reply_mode, 'manual') AS auto_reply_mode,
                    COALESCE(p.last_seen_at, '') AS last_seen_at,
                    COALESCE(p.last_sender, '') AS last_sender,
                    COALESCE(
                        (
                            SELECT group_concat(m.handle, '|||')
                            FROM thread_members m
                            WHERE m.chat_identifi
MessageStore.get_messages method · python · L225-L290 (66 LOC)
apple_messages/messages.py
    def get_messages(
        self,
        minutes: int = 60,
        limit: int = 25,
        include_from_me: bool = True,
        conversation: str = "",
    ) -> list[dict]:
        """Get recent iMessages from local chat.db."""
        if not _IS_MACOS:
            return [_PLATFORM_ERROR]
        safe_minutes = self._normalize_minutes(minutes)
        safe_limit = self._normalize_limit(limit)
        where = [
            "m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
        ]
        params: list[object] = [safe_minutes]
        if not include_from_me:
            where.append("COALESCE(m.is_from_me, 0) = 0")
        if conversation:
            where.append(
                "("
                "lower(COALESCE(h.id, '')) LIKE ? "
                "OR lower(COALESCE(c.chat_identifier, '')) LIKE ?"
                ")"
            )
            needle = f"%{conversation.lower()}%"
            params.extend([needle, needle])
        params.append(safe_lim
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
MessageStore.search_messages method · python · L292-L372 (81 LOC)
apple_messages/messages.py
    def search_messages(
        self,
        query: str,
        minutes: int = 24 * 60,
        limit: int = 25,
        include_from_me: bool = True,
    ) -> list[dict]:
        """Search iMessages by text, sender, or conversation identifier."""
        if not _IS_MACOS:
            return [_PLATFORM_ERROR]
        query = (query or "").strip()
        if not query:
            return []
        safe_minutes = self._normalize_minutes(minutes)
        safe_limit = self._normalize_limit(limit)
        where = [
            "m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
            "("
            "lower(COALESCE(m.text, '')) LIKE ? "
            "OR lower(COALESCE(h.id, '')) LIKE ? "
            "OR lower(COALESCE(c.chat_identifier, '')) LIKE ? "
            "OR (m.text IS NULL AND m.attributedBody IS NOT NULL)"
            ")",
        ]
        params: list[object] = [safe_minutes]
        needle = f"%{query.lower()}%"
        params.extend([needle, need
MessageStore.list_threads method · python · L374-L446 (73 LOC)
apple_messages/messages.py
    def list_threads(self, minutes: int = 7 * 24 * 60, limit: int = 50) -> list[dict]:
        """List active iMessage threads with persisted profile metadata."""
        if not _IS_MACOS:
            return [_PLATFORM_ERROR]
        safe_minutes = self._normalize_minutes(minutes)
        safe_limit = self._normalize_limit(limit)
        sql = """
            SELECT
                COALESCE(c.chat_identifier, '') AS chat_identifier,
                datetime(MAX(m.date) / 1000000000 + 978307200, 'unixepoch', 'localtime') AS last_message_date_local,
                (
                    SELECT m2.text
                    FROM chat_message_join cmj2
                    JOIN message m2 ON m2.ROWID = cmj2.message_id
                    WHERE cmj2.chat_id = c.ROWID
                    ORDER BY m2.date DESC
                    LIMIT 1
                ) AS last_message_text,
                (
                    SELECT m3.attributedBody
                    FROM chat_message_join cmj3
         
MessageStore.get_thread_messages method · python · L448-L508 (61 LOC)
apple_messages/messages.py
    def get_thread_messages(
        self,
        chat_identifier: str,
        minutes: int = 7 * 24 * 60,
        limit: int = 50,
        include_from_me: bool = True,
    ) -> list[dict]:
        """Get messages for a specific iMessage thread by chat_identifier."""
        if not _IS_MACOS:
            return [_PLATFORM_ERROR]
        chat_identifier = (chat_identifier or "").strip()
        if not chat_identifier:
            return [{"error": "chat_identifier is required"}]
        safe_minutes = self._normalize_minutes(minutes)
        safe_limit = self._normalize_limit(limit)
        where = [
            "c.chat_identifier = ?",
            "m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
        ]
        params: list[object] = [chat_identifier, safe_minutes]
        if not include_from_me:
            where.append("COALESCE(m.is_from_me, 0) = 0")
        params.append(safe_limit)

        sql = f"""
            SELECT
                m.guid AS guid,
MessageStore.get_thread_context method · python · L510-L558 (49 LOC)
apple_messages/messages.py
    def get_thread_context(
        self,
        chat_identifier: str,
        minutes: int = 7 * 24 * 60,
        limit: int = 20,
    ) -> dict:
        """Get thread profile + recent message context for orchestration."""
        chat_identifier = (chat_identifier or "").strip()
        if not chat_identifier:
            return {"error": "chat_identifier is required"}
        messages = self.get_thread_messages(
            chat_identifier=chat_identifier,
            minutes=minutes,
            limit=limit,
            include_from_me=True,
        )
        if messages and isinstance(messages[0], dict) and messages[0].get("error"):
            return {"error": messages[0]["error"]}
        profile = self._load_profiles([chat_identifier]).get(chat_identifier, {})
        participants = sorted(
            {
                str(m.get("sender", "")).strip()
                for m in messages
                if str(m.get("sender", "")).strip()
            }
        )
        if profi
MessageStore._resolve_chat_guid method · python · L560-L577 (18 LOC)
apple_messages/messages.py
    def _resolve_chat_guid(self, chat_identifier: str) -> str | None:
        """Look up the Messages.app guid for a chat_identifier.

        The AppleScript ``chat id`` property corresponds to the ``guid`` column
        in chat.db (e.g. ``iMessage;+;chat336858519315148840``), not the
        ``chat_identifier`` column.  Returns *None* when the identifier is not
        found so the caller can fall back gracefully.
        """
        try:
            with self._open_chat_db() as conn:
                row = conn.execute(
                    "SELECT guid FROM chat WHERE chat_identifier = ? LIMIT 1",
                    (chat_identifier,),
                ).fetchone()
                return row["guid"] if row else None
        except (sqlite3.OperationalError, sqlite3.IntegrityError) as exc:
            logger.error("SQLite error resolving chat guid: %s", exc)
            return None
MessageStore.send_message method · python · L579-L638 (60 LOC)
apple_messages/messages.py
    def send_message(
        self,
        to: str = "",
        body: str = "",
        confirm_send: bool = False,
        chat_identifier: str = "",
    ) -> dict:
        """Send an iMessage using communicate.sh. Requires confirm_send=True."""
        if not _IS_MACOS:
            return _PLATFORM_ERROR
        to = (to or "").strip()
        body = (body or "").strip()
        chat_identifier = (chat_identifier or "").strip()
        if not to and not chat_identifier:
            return {"error": "Missing recipient: provide 'to' or 'chat_identifier'"}
        if not body:
            return {"error": "Missing message body"}
        if not confirm_send:
            route = {"chat_identifier": chat_identifier} if chat_identifier else {"to": to}
            return {
                "status": "preview",
                "channel": "imessage",
                **route,
                "body": body,
                "requires_confirmation": True,
            }
        if not self.communic
_escape_osascript function · python · L8-L16 (9 LOC)
apple_notifications/notifier.py
def _escape_osascript(text: str) -> str:
    """Escape text for safe use in AppleScript strings."""
    return (
        text.replace("\\", "\\\\")
        .replace('"', '\\"')
        .replace("\n", "\\n")
        .replace("\r", "\\r")
        .replace("\t", "\\t")
    )
Notifier.send method · python · L23-L55 (33 LOC)
apple_notifications/notifier.py
    def send(
        title: str,
        message: str,
        subtitle: Optional[str] = None,
        sound: Optional[str] = "default",
    ) -> dict:
        if not _IS_MACOS:
            return {"error": "Notifications are only available on macOS"}

        title_escaped = _escape_osascript(title)
        message_escaped = _escape_osascript(message)

        script = f'display notification "{message_escaped}" with title "{title_escaped}"'
        if subtitle:
            script += f' subtitle "{_escape_osascript(subtitle)}"'
        if sound:
            script += f' sound name "{_escape_osascript(sound)}"'

        try:
            subprocess.run(
                ["osascript", "-e", script],
                capture_output=True,
                text=True,
                timeout=5,
                check=True,
            )
            return {"status": "sent", "title": title, "message": message}
        except subprocess.TimeoutExpired:
            return {"error": "Notification ti
Repobility — same analyzer, your code, free for public repos · /scan/
Notifier.send_alert method · python · L58-L81 (24 LOC)
apple_notifications/notifier.py
    def send_alert(title: str, message: str) -> dict:
        if not _IS_MACOS:
            return {"error": "Alerts are only available on macOS"}

        title_escaped = _escape_osascript(title)
        message_escaped = _escape_osascript(message)

        script = f'display alert "{title_escaped}" message "{message_escaped}"'

        try:
            subprocess.run(
                ["osascript", "-e", script],
                capture_output=True,
                text=True,
                timeout=30,
                check=True,
            )
            return {"status": "sent", "title": title}
        except subprocess.TimeoutExpired:
            return {"error": "Alert timed out (user may not have responded)"}
        except subprocess.CalledProcessError as e:
            return {"error": f"osascript failed: {e.stderr.strip()}"}
        except FileNotFoundError:
            return {"error": "osascript not found — not running on macOS?"}
_reminder_to_dict function · python · L36-L72 (37 LOC)
apple_reminders/eventkit.py
def _reminder_to_dict(reminder) -> dict:
    """Convert an EKReminder to a plain dict."""
    due_date = None
    due_components = reminder.dueDateComponents()
    if due_components:
        cal = NSCalendar.currentCalendar()
        ns_date = cal.dateFromComponents_(due_components)
        if ns_date:
            due_date = datetime.fromtimestamp(
                ns_date.timeIntervalSince1970()
            ).isoformat()

    completion_date = None
    comp_ns = reminder.completionDate()
    if comp_ns:
        completion_date = datetime.fromtimestamp(
            comp_ns.timeIntervalSince1970()
        ).isoformat()

    creation_date = None
    create_ns = reminder.creationDate()
    if create_ns:
        creation_date = datetime.fromtimestamp(
            create_ns.timeIntervalSince1970()
        ).isoformat()

    return {
        "id": str(reminder.calendarItemExternalIdentifier()),
        "title": str(reminder.title()) if reminder.title() else "",
        "notes": str(reminder.n
ReminderStore._ensure_store method · python · L90-L107 (18 LOC)
apple_reminders/eventkit.py
    def _ensure_store(self):
        """Lazily create EKEventStore and request access.

        Returns the EKEventStore on success, or an error dict if unavailable.
        """
        if not _EVENTKIT_AVAILABLE:
            return _PLATFORM_ERROR

        if self._store is None:
            self._store = EventKit.EKEventStore.alloc().init()

        if self._access_granted is None:
            self._access_granted = self._request_access()

        if not self._access_granted:
            return _PERMISSION_ERROR

        return self._store
ReminderStore._request_access method · python · L109-L120 (12 LOC)
apple_reminders/eventkit.py
    def _request_access(self) -> bool:
        """Request Reminders access synchronously."""
        granted_flag = threading.Event()
        result = {"granted": False}

        def handler(granted, error):
            result["granted"] = granted
            granted_flag.set()

        self._store.requestFullAccessToRemindersWithCompletion_(handler)
        granted_flag.wait(timeout=30)
        return result["granted"]
ReminderStore._get_reminder_list_by_name method · python · L122-L127 (6 LOC)
apple_reminders/eventkit.py
    def _get_reminder_list_by_name(self, name: str):
        """Find an EKCalendar (reminder list) by display title, or None."""
        for cal in self._store.calendarsForEntityType_(_EK_ENTITY_TYPE_REMINDER):
            if cal.title() == name:
                return cal
        return None
ReminderStore._fetch_reminders method · python · L129-L142 (14 LOC)
apple_reminders/eventkit.py
    def _fetch_reminders(self, predicate) -> list:
        """Execute a reminder fetch with predicate, blocking until complete."""
        results = []
        done = threading.Event()

        def handler(reminders):
            if reminders:
                for r in reminders:
                    results.append(r)
            done.set()

        self._store.fetchRemindersMatchingPredicate_completion_(predicate, handler)
        done.wait(timeout=10)
        return results
ReminderStore._find_reminder_by_id method · python · L144-L154 (11 LOC)
apple_reminders/eventkit.py
    def _find_reminder_by_id(self, reminder_id: str):
        """Find a single EKReminder by its external identifier."""
        item = self._store.calendarItemWithIdentifier_(reminder_id)
        if item is not None:
            return item
        # Fallback: fetch all reminders and filter
        predicate = self._store.predicateForRemindersInCalendars_(None)
        for r in self._fetch_reminders(predicate):
            if str(r.calendarItemExternalIdentifier()) == reminder_id:
                return r
        return None
ReminderStore.list_reminder_lists method · python · L160-L186 (27 LOC)
apple_reminders/eventkit.py
    def list_reminder_lists(self) -> list[dict]:
        """Return all reminder lists (calendars with entityType Reminder)."""
        store = self._ensure_store()
        if isinstance(store, dict):
            return [store]

        try:
            calendars = store.calendarsForEntityType_(_EK_ENTITY_TYPE_REMINDER)
            result = []
            for cal in calendars:
                color = cal.color()
                color_hex = None
                if color:
                    r = int(color.redComponent() * 255)
                    g = int(color.greenComponent() * 255)
                    b = int(color.blueComponent() * 255)
                    color_hex = f"#{r:02x}{g:02x}{b:02x}"

                result.append({
                    "name": str(cal.title()),
                    "source": str(cal.source().title()) if cal.source() else None,
                    "color": color_hex,
                })
            return result
        except (AttributeError, TypeError, Runtime
Repobility · severity-and-effort ranking · https://repobility.com
ReminderStore.list_reminders method · python · L188-L225 (38 LOC)
apple_reminders/eventkit.py
    def list_reminders(
        self,
        list_name: Optional[str] = None,
        completed: Optional[bool] = None,
    ) -> list[dict]:
        """Fetch reminders, optionally filtered by list and completion status.

        completed=None returns all, True returns completed only,
        False returns incomplete only.
        """
        store = self._ensure_store()
        if isinstance(store, dict):
            return [store]

        try:
            calendars = None
            if list_name:
                cal = self._get_reminder_list_by_name(list_name)
                if not cal:
                    return [{"error": f"Reminder list '{list_name}' not found"}]
                calendars = [cal]

            if completed is True:
                predicate = store.predicateForCompletedRemindersWithCompletionDateStarting_ending_calendars_(
                    None, None, calendars
                )
            elif completed is False:
                predicate = store.predicate
ReminderStore.create_reminder method · python · L227-L283 (57 LOC)
apple_reminders/eventkit.py
    def create_reminder(
        self,
        title: str,
        list_name: Optional[str] = None,
        due_date: Optional[str] = None,
        priority: Optional[int] = None,
        notes: Optional[str] = None,
    ) -> dict:
        """Create a new reminder and return its dict representation.

        Args:
            title: Reminder title.
            list_name: Target reminder list name, or default list if None.
            due_date: ISO format date string (e.g. "2025-03-15" or "2025-03-15T14:00:00").
            priority: 0=none, 1=high, 4=medium, 9=low.
            notes: Optional notes text.
        """
        store = self._ensure_store()
        if isinstance(store, dict):
            return store

        try:
            reminder = EventKit.EKReminder.reminderWithEventStore_(store)
            reminder.setTitle_(title)

            if list_name:
                cal = self._get_reminder_list_by_name(list_name)
                if not cal:
                    return {"err
ReminderStore.complete_reminder method · python · L285-L304 (20 LOC)
apple_reminders/eventkit.py
    def complete_reminder(self, reminder_id: str) -> dict:
        """Mark a reminder as completed by its ID."""
        store = self._ensure_store()
        if isinstance(store, dict):
            return store

        try:
            reminder = self._find_reminder_by_id(reminder_id)
            if reminder is None:
                return {"error": f"Reminder not found: {reminder_id}"}

            reminder.setCompleted_(True)
            success, error = store.saveReminder_commit_error_(reminder, True, None)
            if not success:
                return {"error": f"Failed to complete reminder: {error}"}

            return _reminder_to_dict(reminder)
        except (AttributeError, TypeError, RuntimeError) as e:
            logger.error("PyObjC error completing reminder: %s", e)
            return {"error": f"Failed to complete reminder: {e}"}
ReminderStore.delete_reminder method · python · L306-L323 (18 LOC)
apple_reminders/eventkit.py
    def delete_reminder(self, reminder_id: str) -> dict:
        """Delete a reminder by its ID. Returns status dict on success, or an error dict."""
        store = self._ensure_store()
        if isinstance(store, dict):
            return store

        try:
            reminder = self._find_reminder_by_id(reminder_id)
            if reminder is None:
                return {"error": f"Reminder not found: {reminder_id}"}

            success, error = store.removeReminder_commit_error_(reminder, True, None)
            if not success:
                return {"error": f"Failed to delete reminder: {error}"}
            return {"status": "deleted", "reminder_id": reminder_id}
        except (AttributeError, TypeError, RuntimeError) as e:
            logger.error("PyObjC error deleting reminder: %s", e)
            return {"error": f"Failed to delete reminder: {e}"}
ReminderStore.search_reminders method · python · L325-L358 (34 LOC)
apple_reminders/eventkit.py
    def search_reminders(
        self,
        query: str,
        include_completed: bool = False,
    ) -> list[dict]:
        """Search reminders by title text.

        Args:
            query: Text to search for in reminder titles (case-insensitive).
            include_completed: If False (default), only search incomplete reminders.
        """
        store = self._ensure_store()
        if isinstance(store, dict):
            return [store]

        try:
            if include_completed:
                predicate = store.predicateForRemindersInCalendars_(None)
            else:
                predicate = store.predicateForIncompleteRemindersWithDueDateStarting_ending_calendars_(
                    None, None, None
                )

            raw_reminders = self._fetch_reminders(predicate)
            query_lower = query.lower()
            results = []
            for r in raw_reminders:
                title = str(r.title()) if r.title() else ""
                if query
TeamsBrowserManager.__init__ method · python · L43-L51 (9 LOC)
browser/manager.py
    def __init__(
        self,
        cdp_port: int = DEFAULT_CDP_PORT,
        state_path: Optional[Path] = None,
        profile_dir: Optional[Path] = None,
    ):
        self.cdp_port = cdp_port
        self.state_path = state_path or DEFAULT_STATE_PATH
        self.profile_dir = profile_dir or DEFAULT_PROFILE_DIR
TeamsBrowserManager._load_state method · python · L53-L58 (6 LOC)
browser/manager.py
    def _load_state(self) -> Optional[dict]:
        """Load browser state (PID, port) from disk."""
        try:
            return json.loads(self.state_path.read_text())
        except (FileNotFoundError, json.JSONDecodeError, OSError):
            return None
TeamsBrowserManager._clear_state method · python · L65-L70 (6 LOC)
browser/manager.py
    def _clear_state(self) -> None:
        """Remove the state file."""
        try:
            self.state_path.unlink()
        except FileNotFoundError:
            pass
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
TeamsBrowserManager.is_alive method · python · L72-L79 (8 LOC)
browser/manager.py
    def is_alive(self) -> bool:
        """Check if the CDP endpoint is responding."""
        try:
            url = f"http://127.0.0.1:{self.cdp_port}/json/version"
            with urlopen(url, timeout=2) as resp:
                return resp.status == 200
        except (URLError, OSError, TimeoutError):
            return False
TeamsBrowserManager._find_chromium_path method · python · L82-L97 (16 LOC)
browser/manager.py
    def _find_chromium_path() -> Optional[str]:
        """Find the Playwright-bundled Chromium executable."""
        if platform.system() == "Darwin":
            cache = Path.home() / "Library" / "Caches" / "ms-playwright"
        else:
            cache = Path.home() / ".cache" / "ms-playwright"

        for chromium_dir in sorted(cache.glob("chromium-*"), reverse=True):
            if platform.system() == "Darwin":
                candidate = (chromium_dir / "chrome-mac" / "Chromium.app"
                             / "Contents" / "MacOS" / "Chromium")
            else:
                candidate = chromium_dir / "chrome-linux" / "chrome"
            if candidate.exists():
                return str(candidate)
        return None
TeamsBrowserManager.launch method · python · L99-L141 (43 LOC)
browser/manager.py
    def launch(self) -> dict:
        """Launch Chromium as a detached subprocess.

        Returns a status dict with pid and cdp_port.
        If already running, returns current status.
        """
        if self.is_alive():
            state = self._load_state() or {}
            return {"status": "already_running", **state}

        chromium = self._find_chromium_path()
        if chromium is None:
            return {
                "status": "error",
                "error": "Chromium not found. Run: playwright install chromium",
            }

        self.profile_dir.mkdir(parents=True, exist_ok=True)

        proc = subprocess.Popen(
            [chromium, f"--remote-debugging-port={self.cdp_port}",
             f"--user-data-dir={self.profile_dir}",
             "--no-first-run", "--no-default-browser-check"],
            stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
            start_new_session=True,
        )

        for _ in range(30):
            if self.is_a
TeamsBrowserManager.connect method · python · L143-L163 (21 LOC)
browser/manager.py
    async def connect(self):
        """Connect to running Chromium via CDP.

        Returns ``(playwright, browser)`` tuple. Caller must call
        ``await pw.stop()`` when done (this disconnects but does NOT
        kill the browser).

        Raises ``RuntimeError`` if the browser is not running.
        """
        if async_playwright is None:
            raise RuntimeError("playwright is not installed")
        if not self.is_alive():
            raise RuntimeError(
                "Browser is not running. Call open_teams_browser first."
            )
        pw = await async_playwright().start()
        browser = await pw.chromium.connect_over_cdp(
            f"http://127.0.0.1:{self.cdp_port}",
            timeout=10_000,
        )
        return pw, browser
TeamsBrowserManager.close method · python · L165-L180 (16 LOC)
browser/manager.py
    def close(self) -> dict:
        """Stop the Chromium process."""
        state = self._load_state()
        if state and "pid" in state:
            try:
                os.kill(state["pid"], signal.SIGTERM)
                logger.info("Sent SIGTERM to pid %d", state["pid"])
            except ProcessLookupError:
                pass
        self._clear_state()
        # Brief wait for process to exit after SIGTERM
        for _ in range(6):
            if not self.is_alive():
                return {"status": "closed"}
            time.sleep(0.5)
        return {"status": "error", "error": "Browser still running after SIGTERM"}
TeamsNavigator._find_element method · python · L34-L46 (13 LOC)
browser/navigator.py
    async def _find_element(page, selectors, timeout_ms: int = 10_000):
        """Try each selector with retries. Returns first matching locator or None."""
        elapsed = 0
        interval_ms = 1_000
        while True:
            for selector in selectors:
                locator = page.locator(selector)
                if await locator.count() > 0:
                    return locator
            elapsed += interval_ms
            if elapsed >= timeout_ms:
                return None
            await asyncio.sleep(interval_ms / 1_000)
TeamsNavigator._detect_channel_name method · python · L49-L72 (24 LOC)
browser/navigator.py
    async def _detect_channel_name(page) -> str:
        """Detect active channel/conversation name from DOM."""
        for selector in CHANNEL_NAME_SELECTORS:
            try:
                locator = page.locator(selector)
                if await locator.count() > 0:
                    text = await locator.first.inner_text()
                    text = text.strip()
                    if text:
                        return text
            except Exception:
                continue

        try:
            title = await page.title()
            if title and title not in ("Microsoft Teams", ""):
                for suffix in (" | Microsoft Teams", " - Microsoft Teams"):
                    if title.endswith(suffix):
                        title = title[: -len(suffix)]
                return title.strip() or "(unknown)"
        except Exception:
            pass

        return "(unknown)"
TeamsNavigator._find_matching_result method · python · L75-L89 (15 LOC)
browser/navigator.py
    async def _find_matching_result(locator, target: str):
        """Find the search result whose text contains *target*.

        Returns the matching element locator, or ``None`` if no match.
        """
        target_lower = target.lower()
        count = await locator.count()
        for i in range(count):
            try:
                text = await locator.nth(i).inner_text()
                if target_lower in text.lower():
                    return locator.nth(i)
            except Exception:
                continue
        return None
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
TeamsNavigator._name_fallbacks method · python · L92-L106 (15 LOC)
browser/navigator.py
    def _name_fallbacks(target: str) -> list[str]:
        """Generate shorter name variants for search fallback.

        For "Jonas de Oliveira" returns ["Jonas Oliveira", "Jonas"].
        For "Jonas Oliveira" returns ["Jonas"].
        For "Jonas" returns [].
        """
        words = target.split()
        if len(words) <= 1:
            return []
        fallbacks = []
        if len(words) > 2:
            fallbacks.append(f"{words[0]} {words[-1]}")
        fallbacks.append(words[0])
        return fallbacks
TeamsNavigator.search_and_navigate method · python · L108-L192 (85 LOC)
browser/navigator.py
    async def search_and_navigate(self, page, target: str) -> dict:
        """Search for *target* in Teams and navigate to it.

        Returns a dict with:
        - ``"status": "navigated"`` and ``"detected_channel"`` on success
        - ``"status": "error"`` with ``"error"`` detail on failure
        """
        # Find the search bar
        search_bar = await self._find_element(page, SEARCH_SELECTORS, timeout_ms=10_000)
        if search_bar is None:
            # Auto-recover: reload page and retry once
            logger.info("Search bar not found, reloading page and retrying")
            try:
                await page.reload(wait_until="domcontentloaded")
                await asyncio.sleep(3)
            except Exception as exc:
                logger.warning("Page reload failed during recovery: %s", exc)
            search_bar = await self._find_element(page, SEARCH_SELECTORS, timeout_ms=10_000)
            if search_bar is None:
                return {
                  
PlaywrightTeamsPoster.__init__ method · python · L41-L51 (11 LOC)
browser/teams_poster.py
    def __init__(
        self,
        manager: Optional[TeamsBrowserManager] = None,
        navigator: Optional[TeamsNavigator] = None,
    ):
        self._manager = manager or TeamsBrowserManager()
        self._navigator = navigator or TeamsNavigator()
        self._pw = None
        self._page = None
        self._compose = None
        self._pending_message: Optional[str] = None
‹ prevpage 2 / 11next ›