Function bodies 522 total
MailStore.get_message method · python · L146-L208 (63 LOC)apple_mail/mail.py
def get_message(self, message_id: str) -> dict:
"""Get full message content by message ID, including decoded body."""
mid_esc = _escape_osascript(message_id)
script = f'''
tell application "Mail"
set foundMsg to missing value
repeat with acct in accounts
repeat with mb in mailboxes of acct
try
set matchMsgs to (messages of mb whose message id is "{mid_esc}")
if (count of matchMsgs) > 0 then
set foundMsg to item 1 of matchMsgs
exit repeat
end if
end try
end repeat
if foundMsg is not missing value then exit repeat
end repeat
if foundMsg is missing value then return "ERROR: Message not found"
set subj to subject of foundMsg
set sndr to sender of foundMsg
set dt to date sent of foundMsg as text
set isRead to (read status of foundMsg)
set isFlagged to (flagged status of foundMsg)
set toListMailStore.search_messages method · python · L210-L260 (51 LOC)apple_mail/mail.py
def search_messages(self, query: str, mailbox: str = "INBOX", account: str = "", limit: int = 25) -> list[dict]:
"""Search messages by subject or sender text in a mailbox."""
limit = min(max(1, limit), 100)
query_esc = _escape_osascript(query)
mailbox_esc = _escape_osascript(mailbox)
if account:
account_esc = _escape_osascript(account)
mailbox_ref = f'mailbox "{mailbox_esc}" of account "{account_esc}"'
else:
mailbox_ref = f'mailbox "{mailbox_esc}" of account 1'
script = f'''
tell application "Mail"
set mb to {mailbox_ref}
set matchMsgs to (messages of mb whose subject contains "{query_esc}" or sender contains "{query_esc}")
set msgCount to count of matchMsgs
if msgCount is 0 then return ""
set maxMsg to msgCount
if maxMsg > {limit} then set maxMsg to {limit}
set output to ""
repeat with i from 1 to maxMsg
set msg to item i of matchMsgs
set mid MailStore.mark_read method · python · L262-L287 (26 LOC)apple_mail/mail.py
def mark_read(self, message_id: str, read: bool = True) -> dict:
"""Mark a message as read or unread."""
mid_esc = _escape_osascript(message_id)
read_val = "true" if read else "false"
script = f'''
tell application "Mail"
repeat with acct in accounts
repeat with mb in mailboxes of acct
try
set matchMsgs to (messages of mb whose message id is "{mid_esc}")
if (count of matchMsgs) > 0 then
set read status of item 1 of matchMsgs to {read_val}
return "OK"
end if
end try
end repeat
end repeat
return "ERROR: Message not found"
end tell
'''
result = _run_applescript(script)
if "error" in result:
return result
if result["output"].startswith("ERROR:"):
return {"error": result["output"]}
return {"status": "ok", "message_id": message_id, "read": read}MailStore.mark_flagged method · python · L289-L314 (26 LOC)apple_mail/mail.py
def mark_flagged(self, message_id: str, flagged: bool = True) -> dict:
"""Mark a message as flagged or unflagged."""
mid_esc = _escape_osascript(message_id)
flag_val = "true" if flagged else "false"
script = f'''
tell application "Mail"
repeat with acct in accounts
repeat with mb in mailboxes of acct
try
set matchMsgs to (messages of mb whose message id is "{mid_esc}")
if (count of matchMsgs) > 0 then
set flagged status of item 1 of matchMsgs to {flag_val}
return "OK"
end if
end try
end repeat
end repeat
return "ERROR: Message not found"
end tell
'''
result = _run_applescript(script)
if "error" in result:
return result
if result["output"].startswith("ERROR:"):
return {"error": result["output"]}
return {"status": "ok", "message_id": message_id, "flagged": flaMailStore.move_message method · python · L316-L347 (32 LOC)apple_mail/mail.py
def move_message(self, message_id: str, target_mailbox: str, target_account: str = "") -> dict:
"""Move a message to a different mailbox."""
mid_esc = _escape_osascript(message_id)
target_mb_esc = _escape_osascript(target_mailbox)
if target_account:
target_acct_esc = _escape_osascript(target_account)
target_ref = f'mailbox "{target_mb_esc}" of account "{target_acct_esc}"'
else:
target_ref = f'mailbox "{target_mb_esc}" of account 1'
script = f'''
tell application "Mail"
set targetMb to {target_ref}
repeat with acct in accounts
repeat with mb in mailboxes of acct
try
set matchMsgs to (messages of mb whose message id is "{mid_esc}")
if (count of matchMsgs) > 0 then
move item 1 of matchMsgs to targetMb
return "OK"
end if
end try
end repeat
end repeat
return "ERRMailStore.reply_message method · python · L349-L421 (73 LOC)apple_mail/mail.py
def reply_message(
self,
message_id: str,
body: str,
reply_all: bool = False,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
confirm_send: bool = False,
) -> dict:
"""Reply to an existing message in-thread. confirm_send must be True."""
if not confirm_send:
return {"error": "confirm_send must be True. Please confirm with the user before sending."}
mid_esc = _escape_osascript(message_id)
body_esc = _escape_osascript(body)
if reply_all:
reply_flag = "with opening window and reply to all"
else:
reply_flag = "with opening window without reply to all"
# Build additional recipient blocks for extra CC/BCC
cc_block = ""
if cc:
for addr in cc:
addr_esc = _escape_osascript(addr)
cc_block += f'\nmake new cc recipient at end of cc recipients of replyMsg with properMailStore.send_message method · python · L423-L480 (58 LOC)apple_mail/mail.py
def send_message(
self,
to: list[str],
subject: str,
body: str,
cc: Optional[list[str]] = None,
bcc: Optional[list[str]] = None,
confirm_send: bool = False,
) -> dict:
"""Compose and send an email. confirm_send must be True."""
if not confirm_send:
return {"error": "confirm_send must be True. Please confirm with the user before sending."}
subject_esc = _escape_osascript(subject)
body_esc = _escape_osascript(body)
# Build recipient AppleScript blocks
to_block = ""
for addr in to:
addr_esc = _escape_osascript(addr)
to_block += f'\nmake new to recipient at end of to recipients with properties {{address:"{addr_esc}"}}'
cc_block = ""
if cc:
for addr in cc:
addr_esc = _escape_osascript(addr)
cc_block += f'\nmake new cc recipient at end of cc recipients with properties {{addresWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
decode_attributed_body function · python · L14-L39 (26 LOC)apple_messages/messages.py
def decode_attributed_body(blob: bytes) -> str | None:
"""Decode an attributedBody typedstream blob to plain text.
Returns None if the blob cannot be decoded.
"""
if not blob or b"NSString" not in blob:
return None
try:
content = blob.split(b"NSString")[1][5:] # Skip 5-byte preamble
indicator = content[0]
if indicator == 0x81:
length = int.from_bytes(content[1:3], "little")
start = 3
elif indicator == 0x82:
length = int.from_bytes(content[1:5], "little")
start = 5
elif indicator == 0x83:
length = int.from_bytes(content[1:9], "little")
start = 9
else:
length = indicator
start = 1
text = content[start : start + length].decode("utf-8", errors="replace")
return text if text else None
except (IndexError, ValueError):
return NoneMessageStore.__init__ method · python · L53-L71 (19 LOC)apple_messages/messages.py
def __init__(
self,
db_path: Path | None = None,
communicate_script: Path | None = None,
profile_db_path: Path | None = None,
):
self.db_path = Path(db_path) if db_path else (Path.home() / "Library" / "Messages" / "chat.db")
self.communicate_script = (
Path(communicate_script)
if communicate_script
else (_project_root() / "scripts" / "communicate.sh")
)
self.profile_db_path = (
Path(profile_db_path)
if profile_db_path
else (_project_root() / "data" / "imessage-thread-profiles.db")
)
self.profile_db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_profile_db()MessageStore._open_chat_db method · python · L73-L79 (7 LOC)apple_messages/messages.py
def _open_chat_db(self) -> sqlite3.Connection:
if not _IS_MACOS:
raise RuntimeError(_PLATFORM_ERROR["error"])
uri = f"file:{self.db_path}?mode=ro"
conn = sqlite3.connect(uri, uri=True)
conn.row_factory = sqlite3.Row
return connMessageStore._init_profile_db method · python · L98-L121 (24 LOC)apple_messages/messages.py
def _init_profile_db(self) -> None:
with self._open_profile_db() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS thread_profiles (
chat_identifier TEXT PRIMARY KEY,
display_name TEXT,
preferred_handle TEXT,
notes TEXT,
auto_reply_mode TEXT NOT NULL DEFAULT 'manual',
last_seen_at TEXT,
last_sender TEXT,
updated_at_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS thread_members (
chat_identifier TEXT NOT NULL,
handle TEXT NOT NULL,
last_seen_at TEXT,
PRIMARY KEY(chat_identifier, handle)
);
"""
)
conn.commit()MessageStore._upsert_thread_observation method · python · L123-L168 (46 LOC)apple_messages/messages.py
def _upsert_thread_observation(self, chat_identifier: str, sender: str, date_local: str) -> None:
chat_identifier = (chat_identifier or "").strip()
sender = (sender or "").strip()
date_local = (date_local or "").strip()
if not chat_identifier:
return
now = self._utc_now_iso()
with self._open_profile_db() as conn:
conn.execute(
"""
INSERT INTO thread_profiles(
chat_identifier, display_name, preferred_handle, notes, auto_reply_mode,
last_seen_at, last_sender, updated_at_utc
)
VALUES(?, NULL, ?, NULL, 'manual', ?, ?, ?)
ON CONFLICT(chat_identifier) DO UPDATE SET
preferred_handle = CASE
WHEN COALESCE(thread_profiles.preferred_handle, '') = '' THEN excluded.preferred_handle
ELSE thread_profiles.preferred_handle
MessageStore._record_observations method · python · L170-L180 (11 LOC)apple_messages/messages.py
def _record_observations(self, rows: list[dict]) -> None:
seen: set[tuple[str, str, str]] = set()
for row in rows:
chat_identifier = str(row.get("chat_identifier", "")).strip()
sender = str(row.get("sender", "")).strip()
date_local = str(row.get("date_local", "")).strip()
key = (chat_identifier, sender, date_local)
if key in seen:
continue
seen.add(key)
self._upsert_thread_observation(chat_identifier, sender, date_local)MessageStore._load_profiles method · python · L182-L223 (42 LOC)apple_messages/messages.py
def _load_profiles(self, chat_ids: list[str]) -> dict[str, dict]:
chat_ids = [c for c in chat_ids if c]
if not chat_ids:
return {}
placeholders = ",".join("?" for _ in chat_ids)
with self._open_profile_db() as conn:
rows = conn.execute(
f"""
SELECT
p.chat_identifier,
COALESCE(p.display_name, '') AS display_name,
COALESCE(p.preferred_handle, '') AS preferred_handle,
COALESCE(p.notes, '') AS notes,
COALESCE(p.auto_reply_mode, 'manual') AS auto_reply_mode,
COALESCE(p.last_seen_at, '') AS last_seen_at,
COALESCE(p.last_sender, '') AS last_sender,
COALESCE(
(
SELECT group_concat(m.handle, '|||')
FROM thread_members m
WHERE m.chat_identifiMessageStore.get_messages method · python · L225-L290 (66 LOC)apple_messages/messages.py
def get_messages(
self,
minutes: int = 60,
limit: int = 25,
include_from_me: bool = True,
conversation: str = "",
) -> list[dict]:
"""Get recent iMessages from local chat.db."""
if not _IS_MACOS:
return [_PLATFORM_ERROR]
safe_minutes = self._normalize_minutes(minutes)
safe_limit = self._normalize_limit(limit)
where = [
"m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
]
params: list[object] = [safe_minutes]
if not include_from_me:
where.append("COALESCE(m.is_from_me, 0) = 0")
if conversation:
where.append(
"("
"lower(COALESCE(h.id, '')) LIKE ? "
"OR lower(COALESCE(c.chat_identifier, '')) LIKE ?"
")"
)
needle = f"%{conversation.lower()}%"
params.extend([needle, needle])
params.append(safe_limRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
MessageStore.search_messages method · python · L292-L372 (81 LOC)apple_messages/messages.py
def search_messages(
self,
query: str,
minutes: int = 24 * 60,
limit: int = 25,
include_from_me: bool = True,
) -> list[dict]:
"""Search iMessages by text, sender, or conversation identifier."""
if not _IS_MACOS:
return [_PLATFORM_ERROR]
query = (query or "").strip()
if not query:
return []
safe_minutes = self._normalize_minutes(minutes)
safe_limit = self._normalize_limit(limit)
where = [
"m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
"("
"lower(COALESCE(m.text, '')) LIKE ? "
"OR lower(COALESCE(h.id, '')) LIKE ? "
"OR lower(COALESCE(c.chat_identifier, '')) LIKE ? "
"OR (m.text IS NULL AND m.attributedBody IS NOT NULL)"
")",
]
params: list[object] = [safe_minutes]
needle = f"%{query.lower()}%"
params.extend([needle, needMessageStore.list_threads method · python · L374-L446 (73 LOC)apple_messages/messages.py
def list_threads(self, minutes: int = 7 * 24 * 60, limit: int = 50) -> list[dict]:
"""List active iMessage threads with persisted profile metadata."""
if not _IS_MACOS:
return [_PLATFORM_ERROR]
safe_minutes = self._normalize_minutes(minutes)
safe_limit = self._normalize_limit(limit)
sql = """
SELECT
COALESCE(c.chat_identifier, '') AS chat_identifier,
datetime(MAX(m.date) / 1000000000 + 978307200, 'unixepoch', 'localtime') AS last_message_date_local,
(
SELECT m2.text
FROM chat_message_join cmj2
JOIN message m2 ON m2.ROWID = cmj2.message_id
WHERE cmj2.chat_id = c.ROWID
ORDER BY m2.date DESC
LIMIT 1
) AS last_message_text,
(
SELECT m3.attributedBody
FROM chat_message_join cmj3
MessageStore.get_thread_messages method · python · L448-L508 (61 LOC)apple_messages/messages.py
def get_thread_messages(
self,
chat_identifier: str,
minutes: int = 7 * 24 * 60,
limit: int = 50,
include_from_me: bool = True,
) -> list[dict]:
"""Get messages for a specific iMessage thread by chat_identifier."""
if not _IS_MACOS:
return [_PLATFORM_ERROR]
chat_identifier = (chat_identifier or "").strip()
if not chat_identifier:
return [{"error": "chat_identifier is required"}]
safe_minutes = self._normalize_minutes(minutes)
safe_limit = self._normalize_limit(limit)
where = [
"c.chat_identifier = ?",
"m.date > ((strftime('%s', 'now') - 978307200 - (? * 60)) * 1000000000)",
]
params: list[object] = [chat_identifier, safe_minutes]
if not include_from_me:
where.append("COALESCE(m.is_from_me, 0) = 0")
params.append(safe_limit)
sql = f"""
SELECT
m.guid AS guid,MessageStore.get_thread_context method · python · L510-L558 (49 LOC)apple_messages/messages.py
def get_thread_context(
self,
chat_identifier: str,
minutes: int = 7 * 24 * 60,
limit: int = 20,
) -> dict:
"""Get thread profile + recent message context for orchestration."""
chat_identifier = (chat_identifier or "").strip()
if not chat_identifier:
return {"error": "chat_identifier is required"}
messages = self.get_thread_messages(
chat_identifier=chat_identifier,
minutes=minutes,
limit=limit,
include_from_me=True,
)
if messages and isinstance(messages[0], dict) and messages[0].get("error"):
return {"error": messages[0]["error"]}
profile = self._load_profiles([chat_identifier]).get(chat_identifier, {})
participants = sorted(
{
str(m.get("sender", "")).strip()
for m in messages
if str(m.get("sender", "")).strip()
}
)
if profiMessageStore._resolve_chat_guid method · python · L560-L577 (18 LOC)apple_messages/messages.py
def _resolve_chat_guid(self, chat_identifier: str) -> str | None:
"""Look up the Messages.app guid for a chat_identifier.
The AppleScript ``chat id`` property corresponds to the ``guid`` column
in chat.db (e.g. ``iMessage;+;chat336858519315148840``), not the
``chat_identifier`` column. Returns *None* when the identifier is not
found so the caller can fall back gracefully.
"""
try:
with self._open_chat_db() as conn:
row = conn.execute(
"SELECT guid FROM chat WHERE chat_identifier = ? LIMIT 1",
(chat_identifier,),
).fetchone()
return row["guid"] if row else None
except (sqlite3.OperationalError, sqlite3.IntegrityError) as exc:
logger.error("SQLite error resolving chat guid: %s", exc)
return NoneMessageStore.send_message method · python · L579-L638 (60 LOC)apple_messages/messages.py
def send_message(
self,
to: str = "",
body: str = "",
confirm_send: bool = False,
chat_identifier: str = "",
) -> dict:
"""Send an iMessage using communicate.sh. Requires confirm_send=True."""
if not _IS_MACOS:
return _PLATFORM_ERROR
to = (to or "").strip()
body = (body or "").strip()
chat_identifier = (chat_identifier or "").strip()
if not to and not chat_identifier:
return {"error": "Missing recipient: provide 'to' or 'chat_identifier'"}
if not body:
return {"error": "Missing message body"}
if not confirm_send:
route = {"chat_identifier": chat_identifier} if chat_identifier else {"to": to}
return {
"status": "preview",
"channel": "imessage",
**route,
"body": body,
"requires_confirmation": True,
}
if not self.communic_escape_osascript function · python · L8-L16 (9 LOC)apple_notifications/notifier.py
def _escape_osascript(text: str) -> str:
"""Escape text for safe use in AppleScript strings."""
return (
text.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
)Notifier.send method · python · L23-L55 (33 LOC)apple_notifications/notifier.py
def send(
title: str,
message: str,
subtitle: Optional[str] = None,
sound: Optional[str] = "default",
) -> dict:
if not _IS_MACOS:
return {"error": "Notifications are only available on macOS"}
title_escaped = _escape_osascript(title)
message_escaped = _escape_osascript(message)
script = f'display notification "{message_escaped}" with title "{title_escaped}"'
if subtitle:
script += f' subtitle "{_escape_osascript(subtitle)}"'
if sound:
script += f' sound name "{_escape_osascript(sound)}"'
try:
subprocess.run(
["osascript", "-e", script],
capture_output=True,
text=True,
timeout=5,
check=True,
)
return {"status": "sent", "title": title, "message": message}
except subprocess.TimeoutExpired:
return {"error": "Notification tiRepobility — same analyzer, your code, free for public repos · /scan/
Notifier.send_alert method · python · L58-L81 (24 LOC)apple_notifications/notifier.py
def send_alert(title: str, message: str) -> dict:
if not _IS_MACOS:
return {"error": "Alerts are only available on macOS"}
title_escaped = _escape_osascript(title)
message_escaped = _escape_osascript(message)
script = f'display alert "{title_escaped}" message "{message_escaped}"'
try:
subprocess.run(
["osascript", "-e", script],
capture_output=True,
text=True,
timeout=30,
check=True,
)
return {"status": "sent", "title": title}
except subprocess.TimeoutExpired:
return {"error": "Alert timed out (user may not have responded)"}
except subprocess.CalledProcessError as e:
return {"error": f"osascript failed: {e.stderr.strip()}"}
except FileNotFoundError:
return {"error": "osascript not found — not running on macOS?"}_reminder_to_dict function · python · L36-L72 (37 LOC)apple_reminders/eventkit.py
def _reminder_to_dict(reminder) -> dict:
"""Convert an EKReminder to a plain dict."""
due_date = None
due_components = reminder.dueDateComponents()
if due_components:
cal = NSCalendar.currentCalendar()
ns_date = cal.dateFromComponents_(due_components)
if ns_date:
due_date = datetime.fromtimestamp(
ns_date.timeIntervalSince1970()
).isoformat()
completion_date = None
comp_ns = reminder.completionDate()
if comp_ns:
completion_date = datetime.fromtimestamp(
comp_ns.timeIntervalSince1970()
).isoformat()
creation_date = None
create_ns = reminder.creationDate()
if create_ns:
creation_date = datetime.fromtimestamp(
create_ns.timeIntervalSince1970()
).isoformat()
return {
"id": str(reminder.calendarItemExternalIdentifier()),
"title": str(reminder.title()) if reminder.title() else "",
"notes": str(reminder.nReminderStore._ensure_store method · python · L90-L107 (18 LOC)apple_reminders/eventkit.py
def _ensure_store(self):
"""Lazily create EKEventStore and request access.
Returns the EKEventStore on success, or an error dict if unavailable.
"""
if not _EVENTKIT_AVAILABLE:
return _PLATFORM_ERROR
if self._store is None:
self._store = EventKit.EKEventStore.alloc().init()
if self._access_granted is None:
self._access_granted = self._request_access()
if not self._access_granted:
return _PERMISSION_ERROR
return self._storeReminderStore._request_access method · python · L109-L120 (12 LOC)apple_reminders/eventkit.py
def _request_access(self) -> bool:
"""Request Reminders access synchronously."""
granted_flag = threading.Event()
result = {"granted": False}
def handler(granted, error):
result["granted"] = granted
granted_flag.set()
self._store.requestFullAccessToRemindersWithCompletion_(handler)
granted_flag.wait(timeout=30)
return result["granted"]ReminderStore._get_reminder_list_by_name method · python · L122-L127 (6 LOC)apple_reminders/eventkit.py
def _get_reminder_list_by_name(self, name: str):
"""Find an EKCalendar (reminder list) by display title, or None."""
for cal in self._store.calendarsForEntityType_(_EK_ENTITY_TYPE_REMINDER):
if cal.title() == name:
return cal
return NoneReminderStore._fetch_reminders method · python · L129-L142 (14 LOC)apple_reminders/eventkit.py
def _fetch_reminders(self, predicate) -> list:
"""Execute a reminder fetch with predicate, blocking until complete."""
results = []
done = threading.Event()
def handler(reminders):
if reminders:
for r in reminders:
results.append(r)
done.set()
self._store.fetchRemindersMatchingPredicate_completion_(predicate, handler)
done.wait(timeout=10)
return resultsReminderStore._find_reminder_by_id method · python · L144-L154 (11 LOC)apple_reminders/eventkit.py
def _find_reminder_by_id(self, reminder_id: str):
"""Find a single EKReminder by its external identifier."""
item = self._store.calendarItemWithIdentifier_(reminder_id)
if item is not None:
return item
# Fallback: fetch all reminders and filter
predicate = self._store.predicateForRemindersInCalendars_(None)
for r in self._fetch_reminders(predicate):
if str(r.calendarItemExternalIdentifier()) == reminder_id:
return r
return NoneReminderStore.list_reminder_lists method · python · L160-L186 (27 LOC)apple_reminders/eventkit.py
def list_reminder_lists(self) -> list[dict]:
"""Return all reminder lists (calendars with entityType Reminder)."""
store = self._ensure_store()
if isinstance(store, dict):
return [store]
try:
calendars = store.calendarsForEntityType_(_EK_ENTITY_TYPE_REMINDER)
result = []
for cal in calendars:
color = cal.color()
color_hex = None
if color:
r = int(color.redComponent() * 255)
g = int(color.greenComponent() * 255)
b = int(color.blueComponent() * 255)
color_hex = f"#{r:02x}{g:02x}{b:02x}"
result.append({
"name": str(cal.title()),
"source": str(cal.source().title()) if cal.source() else None,
"color": color_hex,
})
return result
except (AttributeError, TypeError, RuntimeRepobility · severity-and-effort ranking · https://repobility.com
ReminderStore.list_reminders method · python · L188-L225 (38 LOC)apple_reminders/eventkit.py
def list_reminders(
self,
list_name: Optional[str] = None,
completed: Optional[bool] = None,
) -> list[dict]:
"""Fetch reminders, optionally filtered by list and completion status.
completed=None returns all, True returns completed only,
False returns incomplete only.
"""
store = self._ensure_store()
if isinstance(store, dict):
return [store]
try:
calendars = None
if list_name:
cal = self._get_reminder_list_by_name(list_name)
if not cal:
return [{"error": f"Reminder list '{list_name}' not found"}]
calendars = [cal]
if completed is True:
predicate = store.predicateForCompletedRemindersWithCompletionDateStarting_ending_calendars_(
None, None, calendars
)
elif completed is False:
predicate = store.predicateReminderStore.create_reminder method · python · L227-L283 (57 LOC)apple_reminders/eventkit.py
def create_reminder(
self,
title: str,
list_name: Optional[str] = None,
due_date: Optional[str] = None,
priority: Optional[int] = None,
notes: Optional[str] = None,
) -> dict:
"""Create a new reminder and return its dict representation.
Args:
title: Reminder title.
list_name: Target reminder list name, or default list if None.
due_date: ISO format date string (e.g. "2025-03-15" or "2025-03-15T14:00:00").
priority: 0=none, 1=high, 4=medium, 9=low.
notes: Optional notes text.
"""
store = self._ensure_store()
if isinstance(store, dict):
return store
try:
reminder = EventKit.EKReminder.reminderWithEventStore_(store)
reminder.setTitle_(title)
if list_name:
cal = self._get_reminder_list_by_name(list_name)
if not cal:
return {"errReminderStore.complete_reminder method · python · L285-L304 (20 LOC)apple_reminders/eventkit.py
def complete_reminder(self, reminder_id: str) -> dict:
"""Mark a reminder as completed by its ID."""
store = self._ensure_store()
if isinstance(store, dict):
return store
try:
reminder = self._find_reminder_by_id(reminder_id)
if reminder is None:
return {"error": f"Reminder not found: {reminder_id}"}
reminder.setCompleted_(True)
success, error = store.saveReminder_commit_error_(reminder, True, None)
if not success:
return {"error": f"Failed to complete reminder: {error}"}
return _reminder_to_dict(reminder)
except (AttributeError, TypeError, RuntimeError) as e:
logger.error("PyObjC error completing reminder: %s", e)
return {"error": f"Failed to complete reminder: {e}"}ReminderStore.delete_reminder method · python · L306-L323 (18 LOC)apple_reminders/eventkit.py
def delete_reminder(self, reminder_id: str) -> dict:
"""Delete a reminder by its ID. Returns status dict on success, or an error dict."""
store = self._ensure_store()
if isinstance(store, dict):
return store
try:
reminder = self._find_reminder_by_id(reminder_id)
if reminder is None:
return {"error": f"Reminder not found: {reminder_id}"}
success, error = store.removeReminder_commit_error_(reminder, True, None)
if not success:
return {"error": f"Failed to delete reminder: {error}"}
return {"status": "deleted", "reminder_id": reminder_id}
except (AttributeError, TypeError, RuntimeError) as e:
logger.error("PyObjC error deleting reminder: %s", e)
return {"error": f"Failed to delete reminder: {e}"}ReminderStore.search_reminders method · python · L325-L358 (34 LOC)apple_reminders/eventkit.py
def search_reminders(
self,
query: str,
include_completed: bool = False,
) -> list[dict]:
"""Search reminders by title text.
Args:
query: Text to search for in reminder titles (case-insensitive).
include_completed: If False (default), only search incomplete reminders.
"""
store = self._ensure_store()
if isinstance(store, dict):
return [store]
try:
if include_completed:
predicate = store.predicateForRemindersInCalendars_(None)
else:
predicate = store.predicateForIncompleteRemindersWithDueDateStarting_ending_calendars_(
None, None, None
)
raw_reminders = self._fetch_reminders(predicate)
query_lower = query.lower()
results = []
for r in raw_reminders:
title = str(r.title()) if r.title() else ""
if queryTeamsBrowserManager.__init__ method · python · L43-L51 (9 LOC)browser/manager.py
def __init__(
self,
cdp_port: int = DEFAULT_CDP_PORT,
state_path: Optional[Path] = None,
profile_dir: Optional[Path] = None,
):
self.cdp_port = cdp_port
self.state_path = state_path or DEFAULT_STATE_PATH
self.profile_dir = profile_dir or DEFAULT_PROFILE_DIRTeamsBrowserManager._load_state method · python · L53-L58 (6 LOC)browser/manager.py
def _load_state(self) -> Optional[dict]:
"""Load browser state (PID, port) from disk."""
try:
return json.loads(self.state_path.read_text())
except (FileNotFoundError, json.JSONDecodeError, OSError):
return NoneTeamsBrowserManager._clear_state method · python · L65-L70 (6 LOC)browser/manager.py
def _clear_state(self) -> None:
"""Remove the state file."""
try:
self.state_path.unlink()
except FileNotFoundError:
passWant fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
TeamsBrowserManager.is_alive method · python · L72-L79 (8 LOC)browser/manager.py
def is_alive(self) -> bool:
"""Check if the CDP endpoint is responding."""
try:
url = f"http://127.0.0.1:{self.cdp_port}/json/version"
with urlopen(url, timeout=2) as resp:
return resp.status == 200
except (URLError, OSError, TimeoutError):
return FalseTeamsBrowserManager._find_chromium_path method · python · L82-L97 (16 LOC)browser/manager.py
def _find_chromium_path() -> Optional[str]:
"""Find the Playwright-bundled Chromium executable."""
if platform.system() == "Darwin":
cache = Path.home() / "Library" / "Caches" / "ms-playwright"
else:
cache = Path.home() / ".cache" / "ms-playwright"
for chromium_dir in sorted(cache.glob("chromium-*"), reverse=True):
if platform.system() == "Darwin":
candidate = (chromium_dir / "chrome-mac" / "Chromium.app"
/ "Contents" / "MacOS" / "Chromium")
else:
candidate = chromium_dir / "chrome-linux" / "chrome"
if candidate.exists():
return str(candidate)
return NoneTeamsBrowserManager.launch method · python · L99-L141 (43 LOC)browser/manager.py
def launch(self) -> dict:
"""Launch Chromium as a detached subprocess.
Returns a status dict with pid and cdp_port.
If already running, returns current status.
"""
if self.is_alive():
state = self._load_state() or {}
return {"status": "already_running", **state}
chromium = self._find_chromium_path()
if chromium is None:
return {
"status": "error",
"error": "Chromium not found. Run: playwright install chromium",
}
self.profile_dir.mkdir(parents=True, exist_ok=True)
proc = subprocess.Popen(
[chromium, f"--remote-debugging-port={self.cdp_port}",
f"--user-data-dir={self.profile_dir}",
"--no-first-run", "--no-default-browser-check"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
start_new_session=True,
)
for _ in range(30):
if self.is_aTeamsBrowserManager.connect method · python · L143-L163 (21 LOC)browser/manager.py
async def connect(self):
"""Connect to running Chromium via CDP.
Returns ``(playwright, browser)`` tuple. Caller must call
``await pw.stop()`` when done (this disconnects but does NOT
kill the browser).
Raises ``RuntimeError`` if the browser is not running.
"""
if async_playwright is None:
raise RuntimeError("playwright is not installed")
if not self.is_alive():
raise RuntimeError(
"Browser is not running. Call open_teams_browser first."
)
pw = await async_playwright().start()
browser = await pw.chromium.connect_over_cdp(
f"http://127.0.0.1:{self.cdp_port}",
timeout=10_000,
)
return pw, browserTeamsBrowserManager.close method · python · L165-L180 (16 LOC)browser/manager.py
def close(self) -> dict:
"""Stop the Chromium process."""
state = self._load_state()
if state and "pid" in state:
try:
os.kill(state["pid"], signal.SIGTERM)
logger.info("Sent SIGTERM to pid %d", state["pid"])
except ProcessLookupError:
pass
self._clear_state()
# Brief wait for process to exit after SIGTERM
for _ in range(6):
if not self.is_alive():
return {"status": "closed"}
time.sleep(0.5)
return {"status": "error", "error": "Browser still running after SIGTERM"}TeamsNavigator._find_element method · python · L34-L46 (13 LOC)browser/navigator.py
async def _find_element(page, selectors, timeout_ms: int = 10_000):
"""Try each selector with retries. Returns first matching locator or None."""
elapsed = 0
interval_ms = 1_000
while True:
for selector in selectors:
locator = page.locator(selector)
if await locator.count() > 0:
return locator
elapsed += interval_ms
if elapsed >= timeout_ms:
return None
await asyncio.sleep(interval_ms / 1_000)TeamsNavigator._detect_channel_name method · python · L49-L72 (24 LOC)browser/navigator.py
async def _detect_channel_name(page) -> str:
"""Detect active channel/conversation name from DOM."""
for selector in CHANNEL_NAME_SELECTORS:
try:
locator = page.locator(selector)
if await locator.count() > 0:
text = await locator.first.inner_text()
text = text.strip()
if text:
return text
except Exception:
continue
try:
title = await page.title()
if title and title not in ("Microsoft Teams", ""):
for suffix in (" | Microsoft Teams", " - Microsoft Teams"):
if title.endswith(suffix):
title = title[: -len(suffix)]
return title.strip() or "(unknown)"
except Exception:
pass
return "(unknown)"TeamsNavigator._find_matching_result method · python · L75-L89 (15 LOC)browser/navigator.py
async def _find_matching_result(locator, target: str):
"""Find the search result whose text contains *target*.
Returns the matching element locator, or ``None`` if no match.
"""
target_lower = target.lower()
count = await locator.count()
for i in range(count):
try:
text = await locator.nth(i).inner_text()
if target_lower in text.lower():
return locator.nth(i)
except Exception:
continue
return NoneRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
TeamsNavigator._name_fallbacks method · python · L92-L106 (15 LOC)browser/navigator.py
def _name_fallbacks(target: str) -> list[str]:
"""Generate shorter name variants for search fallback.
For "Jonas de Oliveira" returns ["Jonas Oliveira", "Jonas"].
For "Jonas Oliveira" returns ["Jonas"].
For "Jonas" returns [].
"""
words = target.split()
if len(words) <= 1:
return []
fallbacks = []
if len(words) > 2:
fallbacks.append(f"{words[0]} {words[-1]}")
fallbacks.append(words[0])
return fallbacksTeamsNavigator.search_and_navigate method · python · L108-L192 (85 LOC)browser/navigator.py
async def search_and_navigate(self, page, target: str) -> dict:
"""Search for *target* in Teams and navigate to it.
Returns a dict with:
- ``"status": "navigated"`` and ``"detected_channel"`` on success
- ``"status": "error"`` with ``"error"`` detail on failure
"""
# Find the search bar
search_bar = await self._find_element(page, SEARCH_SELECTORS, timeout_ms=10_000)
if search_bar is None:
# Auto-recover: reload page and retry once
logger.info("Search bar not found, reloading page and retrying")
try:
await page.reload(wait_until="domcontentloaded")
await asyncio.sleep(3)
except Exception as exc:
logger.warning("Page reload failed during recovery: %s", exc)
search_bar = await self._find_element(page, SEARCH_SELECTORS, timeout_ms=10_000)
if search_bar is None:
return {
PlaywrightTeamsPoster.__init__ method · python · L41-L51 (11 LOC)browser/teams_poster.py
def __init__(
self,
manager: Optional[TeamsBrowserManager] = None,
navigator: Optional[TeamsNavigator] = None,
):
self._manager = manager or TeamsBrowserManager()
self._navigator = navigator or TeamsNavigator()
self._pw = None
self._page = None
self._compose = None
self._pending_message: Optional[str] = None