Function bodies 522 total
BaseExpertAgent.__init__ method · python · L22-L43 (22 LOC)agents/base.py
def __init__(
self,
config: AgentConfig,
memory_store: MemoryStore,
document_store: DocumentStore,
client: Optional[anthropic.AsyncAnthropic] = None,
calendar_store=None,
reminder_store=None,
notifier=None,
mail_store=None,
hook_registry=None,
):
self.config = config
self.name = config.name
self.memory_store = memory_store
self.document_store = document_store
self.calendar_store = calendar_store
self.reminder_store = reminder_store
self.notifier = notifier
self.mail_store = mail_store
self.hook_registry = hook_registry
self.client = client or anthropic.AsyncAnthropic(api_key=app_config.ANTHROPIC_API_KEY)BaseExpertAgent.build_system_prompt method · python · L45-L73 (29 LOC)agents/base.py
def build_system_prompt(self) -> str:
prompt = self.config.system_prompt
# Inject runtime context: agent identity and current date
today = date.today().isoformat()
prompt += f"\n\n## Runtime Context\n- Agent name: {self.name}\n- Today's date: {today}"
try:
memories = self.memory_store.get_agent_memories(self.name)
except Exception:
memories = []
if memories:
lines = ["\n\n## Agent Memory (retained from previous runs)"]
for m in memories:
lines.append(f"- {m.key}: {m.value}")
prompt += "\n".join(lines)
# Inject shared namespace memories
for ns in getattr(self.config, "namespaces", []):
try:
shared = self.memory_store.get_shared_memories(ns)
except Exception:
shared = []
if shared:
lines = [f"\n\n## Shared Memory [{ns}]"]
for m in sharedBaseExpertAgent.execute method · python · L78-L134 (57 LOC)agents/base.py
async def execute(self, task: str) -> str:
messages = [{"role": "user", "content": task}]
tools = self.get_tools()
loop_detector = LoopDetector()
for _round in range(MAX_TOOL_ROUNDS):
response = await self._call_api(messages, tools)
# Check if the model wants to use a tool
if response.stop_reason == "tool_use":
# Process tool calls
assistant_content = response.content
messages.append({"role": "assistant", "content": assistant_content})
tool_results = []
should_break = False
for block in assistant_content:
if block.type == "tool_use":
signal = loop_detector.record(block.name, block.input)
if signal == "break":
should_break = True
tool_results.append({
"type": "tool_BaseExpertAgent._call_api method · python · L137-L150 (14 LOC)agents/base.py
async def _call_api(self, messages: list, tools: list) -> Any:
model_id = app_config.MODEL_TIERS.get(
self.config.model,
app_config.MODEL_TIERS[app_config.DEFAULT_MODEL_TIER],
)
kwargs = {
"model": model_id,
"max_tokens": self.config.max_tokens,
"system": self.build_system_prompt(),
"messages": messages,
}
if tools:
kwargs["tools"] = tools
return await self.client.messages.create(**kwargs)BaseExpertAgent._fire_hooks method · python · L152-L159 (8 LOC)agents/base.py
def _fire_hooks(self, event_type: str, context: dict) -> list:
"""Fire hooks if a hook_registry is available. Error-isolated."""
if self.hook_registry is None:
return []
try:
return self.hook_registry.fire_hooks(event_type, context)
except Exception:
return []BaseExpertAgent._handle_tool_call method · python · L161-L205 (45 LOC)agents/base.py
def _handle_tool_call(self, tool_name: str, tool_input: dict) -> Any:
from hooks.registry import build_tool_context, extract_transformed_args
# Fire before_tool_call hooks
before_ctx = build_tool_context(
tool_name=tool_name,
tool_args=tool_input,
agent_name=self.name,
)
hook_results = self._fire_hooks("before_tool_call", before_ctx)
# Apply arg transformations from before_tool_call hooks
transformed = extract_transformed_args(hook_results)
if transformed is not None:
tool_input = transformed
# Enforce capability boundaries
allowed_tools = {t["name"] for t in self.get_tools()}
if tool_name not in allowed_tools:
result = {"error": f"Tool '{tool_name}' not permitted for agent '{self.name}'"}
after_ctx = build_tool_context(
tool_name=tool_name,
tool_args=tool_input,
agent_name=BaseExpertAgent._dispatch_tool method · python · L207-L315 (109 LOC)agents/base.py
def _dispatch_tool(self, tool_name: str, tool_input: dict) -> Any:
"""Route a tool call to the appropriate handler. Returns the result."""
if tool_name == "query_memory":
return execute_query_memory(
self.memory_store, tool_input["query"], tool_input.get("category")
)
elif tool_name == "store_memory":
return execute_store_memory(
self.memory_store,
tool_input["category"],
tool_input["key"],
tool_input["value"],
source=self.name,
)
elif tool_name == "search_documents":
return execute_search_documents(
self.document_store, tool_input["query"], tool_input.get("top_k", 5)
)
elif tool_name == "create_decision":
return self._handle_create_decision(tool_input)
elif tool_name == "search_decisions":
return self._handle_search_decisionsWant this analysis on your repo? https://repobility.com/scan/
BaseExpertAgent._handle_create_decision method · python · L317-L329 (13 LOC)agents/base.py
def _handle_create_decision(self, tool_input: dict) -> Any:
return lifecycle_tools.create_decision(
self.memory_store,
title=tool_input["title"],
description=tool_input.get("description", ""),
context=tool_input.get("context", ""),
decided_by=tool_input.get("decided_by", ""),
owner=tool_input.get("owner", ""),
status=tool_input.get("status", "pending_execution"),
follow_up_date=tool_input.get("follow_up_date", ""),
tags=tool_input.get("tags", ""),
source=tool_input.get("source", self.name),
)BaseExpertAgent._handle_search_decisions method · python · L331-L336 (6 LOC)agents/base.py
def _handle_search_decisions(self, tool_input: dict) -> Any:
return lifecycle_tools.search_decisions(
self.memory_store,
query=tool_input.get("query", ""),
status=tool_input.get("status", ""),
)BaseExpertAgent._handle_update_decision method · python · L338-L344 (7 LOC)agents/base.py
def _handle_update_decision(self, tool_input: dict) -> Any:
return lifecycle_tools.update_decision(
self.memory_store,
decision_id=tool_input["decision_id"],
status=tool_input.get("status", ""),
notes=tool_input.get("notes", ""),
)BaseExpertAgent._handle_create_delegation method · python · L355-L364 (10 LOC)agents/base.py
def _handle_create_delegation(self, tool_input: dict) -> Any:
return lifecycle_tools.create_delegation(
self.memory_store,
task=tool_input["task"],
delegated_to=tool_input["delegated_to"],
description=tool_input.get("description", ""),
due_date=tool_input.get("due_date", ""),
priority=tool_input.get("priority", "medium"),
source=tool_input.get("source", self.name),
)BaseExpertAgent._handle_list_delegations method · python · L366-L371 (6 LOC)agents/base.py
def _handle_list_delegations(self, tool_input: dict) -> Any:
return lifecycle_tools.list_delegations(
self.memory_store,
status=tool_input.get("status", ""),
delegated_to=tool_input.get("delegated_to", ""),
)BaseExpertAgent._handle_update_delegation method · python · L373-L379 (7 LOC)agents/base.py
def _handle_update_delegation(self, tool_input: dict) -> Any:
return lifecycle_tools.update_delegation(
self.memory_store,
delegation_id=tool_input["delegation_id"],
status=tool_input.get("status", ""),
notes=tool_input.get("notes", ""),
)BaseExpertAgent._handle_create_alert_rule method · python · L390-L398 (9 LOC)agents/base.py
def _handle_create_alert_rule(self, tool_input: dict) -> Any:
return lifecycle_tools.create_alert_rule(
self.memory_store,
name=tool_input["name"],
alert_type=tool_input["alert_type"],
description=tool_input.get("description", ""),
condition=tool_input.get("condition", ""),
enabled=tool_input.get("enabled", True),
)BaseExpertAgent._handle_calendar_get_events method · python · L415-L428 (14 LOC)agents/base.py
def _handle_calendar_get_events(self, tool_input: dict) -> Any:
if self.calendar_store is None:
return {"error": "Calendar not available (macOS only)"}
from datetime import datetime
start_dt = datetime.fromisoformat(tool_input["start_date"])
end_dt = datetime.fromisoformat(tool_input["end_date"])
calendar_names = [tool_input["calendar_name"]] if tool_input.get("calendar_name") else None
return self.calendar_store.get_events(
start_dt,
end_dt,
calendar_names=calendar_names,
provider_preference=tool_input.get("provider_preference", "auto"),
source_filter=tool_input.get("source_filter", ""),
)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
BaseExpertAgent._handle_calendar_search method · python · L430-L443 (14 LOC)agents/base.py
def _handle_calendar_search(self, tool_input: dict) -> Any:
if self.calendar_store is None:
return {"error": "Calendar not available (macOS only)"}
from datetime import datetime, timedelta
now = datetime.now()
start_dt = datetime.fromisoformat(tool_input["start_date"]) if tool_input.get("start_date") else now - timedelta(days=30)
end_dt = datetime.fromisoformat(tool_input["end_date"]) if tool_input.get("end_date") else now + timedelta(days=30)
return self.calendar_store.search_events(
tool_input["query"],
start_dt,
end_dt,
provider_preference=tool_input.get("provider_preference", "auto"),
source_filter=tool_input.get("source_filter", ""),
)BaseExpertAgent._handle_reminder_list method · python · L445-L451 (7 LOC)agents/base.py
def _handle_reminder_list(self, tool_input: dict) -> Any:
if self.reminder_store is None:
return {"error": "Reminders not available (macOS only)"}
return self.reminder_store.list_reminders(
list_name=tool_input.get("list_name"),
completed=tool_input.get("completed"),
)BaseExpertAgent._handle_reminder_search method · python · L453-L459 (7 LOC)agents/base.py
def _handle_reminder_search(self, tool_input: dict) -> Any:
if self.reminder_store is None:
return {"error": "Reminders not available (macOS only)"}
return self.reminder_store.search_reminders(
query=tool_input["query"],
include_completed=tool_input.get("include_completed", False),
)BaseExpertAgent._handle_reminder_create method · python · L461-L470 (10 LOC)agents/base.py
def _handle_reminder_create(self, tool_input: dict) -> Any:
if self.reminder_store is None:
return {"error": "Reminders not available (macOS only)"}
return self.reminder_store.create_reminder(
title=tool_input["title"],
list_name=tool_input.get("list_name"),
due_date=tool_input.get("due_date"),
priority=tool_input.get("priority"),
notes=tool_input.get("notes"),
)BaseExpertAgent._handle_send_notification method · python · L477-L485 (9 LOC)agents/base.py
def _handle_send_notification(self, tool_input: dict) -> Any:
if self.notifier is None:
return {"error": "Notifications not available (macOS only)"}
return self.notifier.send(
title=tool_input["title"],
message=tool_input["message"],
subtitle=tool_input.get("subtitle"),
sound=tool_input.get("sound", "default"),
)BaseExpertAgent._handle_mail_get_messages method · python · L487-L494 (8 LOC)agents/base.py
def _handle_mail_get_messages(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
return self.mail_store.get_messages(
mailbox=tool_input.get("mailbox", "INBOX"),
account=tool_input.get("account", ""),
limit=tool_input.get("limit", 25),
)BaseExpertAgent._handle_mail_search method · python · L501-L509 (9 LOC)agents/base.py
def _handle_mail_search(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
return self.mail_store.search_messages(
query=tool_input["query"],
mailbox=tool_input.get("mailbox", "INBOX"),
account=tool_input.get("account", ""),
limit=tool_input.get("limit", 25),
)BaseExpertAgent._handle_mail_get_unread_count method · python · L511-L522 (12 LOC)agents/base.py
def _handle_mail_get_unread_count(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
mailbox = tool_input.get("mailbox", "INBOX")
account = tool_input.get("account", "")
mailboxes = self.mail_store.list_mailboxes()
for mb in mailboxes:
if isinstance(mb, dict) and mb.get("name") == mailbox:
if account and mb.get("account") != account:
continue
return {"mailbox": mailbox, "unread_count": mb.get("unread_count", 0)}
return {"mailbox": mailbox, "unread_count": 0}Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
BaseExpertAgent._handle_mail_send method · python · L524-L537 (14 LOC)agents/base.py
def _handle_mail_send(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
to_list = [a.strip() for a in tool_input["to"].split(",") if a.strip()]
cc_list = [a.strip() for a in tool_input.get("cc", "").split(",") if a.strip()] or None
bcc_list = [a.strip() for a in tool_input.get("bcc", "").split(",") if a.strip()] or None
return self.mail_store.send_message(
to=to_list,
subject=tool_input["subject"],
body=tool_input["body"],
cc=cc_list,
bcc=bcc_list,
confirm_send=False, # Agents must never auto-send; only MCP server (human-in-loop) can confirm
)BaseExpertAgent._handle_mail_mark_read method · python · L539-L545 (7 LOC)agents/base.py
def _handle_mail_mark_read(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
return self.mail_store.mark_read(
message_id=tool_input["message_id"],
read=tool_input.get("read", True),
)BaseExpertAgent._handle_mail_mark_flagged method · python · L547-L553 (7 LOC)agents/base.py
def _handle_mail_mark_flagged(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
return self.mail_store.mark_flagged(
message_id=tool_input["message_id"],
flagged=tool_input.get("flagged", True),
)BaseExpertAgent._handle_mail_move_message method · python · L555-L562 (8 LOC)agents/base.py
def _handle_mail_move_message(self, tool_input: dict) -> Any:
if self.mail_store is None:
return {"error": "Mail not available (macOS only)"}
return self.mail_store.move_message(
message_id=tool_input["message_id"],
target_mailbox=tool_input["target_mailbox"],
target_account=tool_input.get("target_account", ""),
)AgentFactory.create_agent method · python · L35-L63 (29 LOC)agents/factory.py
def create_agent(self, description: str) -> AgentConfig:
client = anthropic.Anthropic(api_key=app_config.ANTHROPIC_API_KEY)
response = client.messages.create(
model=app_config.MODEL_TIERS["haiku"],
max_tokens=1024,
system=AGENT_CREATION_PROMPT,
messages=[{"role": "user", "content": f"I need an agent for: {description}"}],
)
raw = response.content[0].text
data = json.loads(raw)
capabilities = validate_capabilities(data.get("capabilities", ["memory_read"]))
# Filter out restricted capabilities and cap count for auto-generated agents
capabilities = [c for c in capabilities if c not in RESTRICTED_CAPABILITIES]
capabilities = capabilities[:MAX_AUTO_CAPABILITIES]
config = AgentConfig(
name=data["name"],
description=data["description"],
system_prompt=data["system_prompt"],
capabilities=capabilities,
teLoopDetector.record method · python · L27-L45 (19 LOC)agents/loop_detector.py
def record(self, tool_name: str, tool_args: dict) -> str:
"""Record a tool call and return 'ok', 'warning', or 'break'."""
key = self._make_key(tool_name, tool_args)
self._counts[key] += 1
self._history.append(key)
count = self._counts[key]
if count >= self.break_threshold:
return "break"
if count >= self.warn_threshold:
return "warning"
# Check A-B-A-B alternation (need at least 4 entries)
if len(self._history) >= 4:
h = self._history
if (h[-1] == h[-3] and h[-2] == h[-4] and h[-1] != h[-2]):
return "warning"
return "ok"AgentRegistry._ensure_cache method · python · L36-L45 (10 LOC)agents/registry.py
def _ensure_cache(self) -> None:
"""Load all configs into cache on first access."""
if self._cache_loaded:
return
self._cache.clear()
for path in sorted(self.configs_dir.glob("*.yaml")):
config = self._load_yaml(path)
if config:
self._cache[config.name] = config
self._cache_loaded = TrueAgentRegistry._validate_name method · python · L56-L61 (6 LOC)agents/registry.py
def _validate_name(name: str) -> None:
if not VALID_AGENT_NAME.match(name):
raise ValueError(
f"Invalid agent name '{name}': must be lowercase alphanumeric, "
"underscores, or hyphens, starting with a letter or digit"
)Methodology: Repobility · https://repobility.com/research/state-of-ai-code-2026/
AgentRegistry.save_agent method · python · L68-L90 (23 LOC)agents/registry.py
def save_agent(self, config: AgentConfig) -> Path:
self._validate_name(config.name)
normalized_capabilities = validate_capabilities(config.capabilities)
config.capabilities = normalized_capabilities
path = self.configs_dir / f"{config.name}.yaml"
data = {
"name": config.name,
"description": config.description,
"system_prompt": config.system_prompt,
"capabilities": normalized_capabilities,
"temperature": config.temperature,
"max_tokens": config.max_tokens,
"model": config.model,
}
if config.namespaces:
data["namespaces"] = config.namespaces
if config.created_by:
data["created_by"] = config.created_by
if config.created_at:
data["created_at"] = config.created_at
path.write_text(yaml.dump(data, default_flow_style=False))
self._invalidate_cache()
return pathAgentRegistry._load_yaml method · python · L97-L114 (18 LOC)agents/registry.py
def _load_yaml(self, path: Path) -> Optional[AgentConfig]:
try:
data = yaml.safe_load(path.read_text())
capabilities = validate_capabilities(data.get("capabilities", []))
return AgentConfig(
name=data["name"],
description=data.get("description", ""),
system_prompt=data.get("system_prompt", ""),
capabilities=capabilities,
namespaces=data.get("namespaces", []),
temperature=data.get("temperature", 0.3),
max_tokens=data.get("max_tokens", 4096),
model=data.get("model", "sonnet"),
created_by=data.get("created_by"),
created_at=data.get("created_at"),
)
except (yaml.YAMLError, KeyError, ValueError):
return Noneclassify_complexity function · python · L30-L62 (33 LOC)agents/triage.py
def classify_complexity(
task_text: str,
agent_config: AgentConfig,
client: Optional[anthropic.Anthropic] = None,
) -> str:
"""Classify task complexity using a Haiku pre-call.
Returns 'simple', 'standard', or 'complex'.
On any error, returns 'standard' (safe fallback).
"""
if client is None:
client = anthropic.Anthropic(api_key=app_config.ANTHROPIC_API_KEY)
prompt = _TRIAGE_PROMPT.format(
name=agent_config.name,
description=agent_config.description,
task=task_text[:1000],
)
try:
response = client.messages.create(
model=app_config.MODEL_TIERS["haiku"],
max_tokens=10,
messages=[{"role": "user", "content": prompt}],
)
text = response.content[0].text.strip().lower()
if text in _VALID_CLASSIFICATIONS:
return text
logger.warning("Triage returned unexpected value: %r, defaulting to standard", text)
return "standard"
classify_and_resolve function · python · L69-L93 (25 LOC)agents/triage.py
def classify_and_resolve(
agent_config: AgentConfig,
task_text: str,
client: Optional[anthropic.Anthropic] = None,
) -> AgentConfig:
"""Classify task complexity and return a (possibly downgraded) config.
Skips triage for haiku agents (already cheapest) and opus agents (reserved).
Returns a copy with model overridden if downgraded; original config is never mutated.
On error, returns the original config unchanged.
"""
if agent_config.model in _SKIP_TRIAGE_TIERS:
return agent_config
classification = classify_complexity(task_text, agent_config, client=client)
if classification == "simple":
logger.info(
"Triage: agent=%s classification=simple, downgrading from %s to haiku",
agent_config.name,
agent_config.model,
)
return replace(agent_config, model="haiku")
return agent_config_event_to_dict function · python · L34-L58 (25 LOC)apple_calendar/eventkit.py
def _event_to_dict(event, calendar_name: str) -> dict:
"""Convert an EKEvent to a plain dict."""
attendees = []
if event.attendees():
for a in event.attendees():
attendees.append({
"name": str(a.name()) if a.name() else None,
"email": str(a.URL().resourceSpecifier()) if a.URL() else None,
"status": int(a.participantStatus()),
})
start = event.startDate()
end = event.endDate()
return {
"uid": str(event.calendarItemExternalIdentifier()),
"title": str(event.title()) if event.title() else "",
"start": datetime.fromtimestamp(start.timeIntervalSince1970()).isoformat() if start else None,
"end": datetime.fromtimestamp(end.timeIntervalSince1970()).isoformat() if end else None,
"location": str(event.location()) if event.location() else None,
"notes": str(event.notes()) if event.notes() else None,
"attendees": attendees,
"calendCalendarStore._ensure_store method · python · L76-L93 (18 LOC)apple_calendar/eventkit.py
def _ensure_store(self) -> Optional[dict]:
"""Lazily create EKEventStore and request access.
Returns None on success, or an error dict if unavailable.
"""
if not _EVENTKIT_AVAILABLE:
return _PLATFORM_ERROR
if self._store is None:
self._store = EventKit.EKEventStore.alloc().init()
if self._access_granted is None:
self._access_granted = self._request_access()
if not self._access_granted:
return _PERMISSION_ERROR
return NoneCalendarStore._request_access method · python · L95-L111 (17 LOC)apple_calendar/eventkit.py
def _request_access(self) -> bool:
"""Request calendar access synchronously.
Uses the completion-handler API and waits on a simple flag.
"""
import threading
granted_flag = threading.Event()
result = {"granted": False}
def handler(granted, error):
result["granted"] = granted
granted_flag.set()
self._store.requestFullAccessToEventsWithCompletion_(handler)
granted_flag.wait(timeout=30)
return result["granted"]CalendarStore._get_calendar_by_name method · python · L113-L133 (21 LOC)apple_calendar/eventkit.py
def _get_calendar_by_name(self, name: str, source: Optional[str] = None):
"""Find an EKCalendar by display title (and optionally source), or None.
Resolves calendar aliases from config before matching. When two
calendars share the same display name (e.g. iCloud "Calendar" and
Exchange "Calendar"), the alias provides the source to disambiguate.
"""
from config import CALENDAR_ALIASES
alias = CALENDAR_ALIASES.get((name or "").strip().lower())
if alias:
name = alias["name"]
source = source or alias.get("source")
for cal in self._store.calendarsForEntityType_(0): # 0 = Event
if cal.title() == name:
if source and cal.source():
if str(cal.source().title()).lower() != source.lower():
continue
return cal
return NoneWant this analysis on your repo? https://repobility.com/scan/
CalendarStore._find_event_by_uid method · python · L135-L156 (22 LOC)apple_calendar/eventkit.py
def _find_event_by_uid(self, uid: str, calendar_name: Optional[str] = None):
"""Find a single EKEvent by its external identifier."""
event = self._store.calendarItemWithIdentifier_(uid)
if event is not None:
return event
# Fallback: search recent range for the UID
from datetime import timedelta
now = datetime.now()
start = _ns_date(now - timedelta(days=365 * 2))
end = _ns_date(now + timedelta(days=365 * 2))
calendars = None
if calendar_name:
cal = self._get_calendar_by_name(calendar_name)
if cal:
calendars = [cal]
predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
start, end, calendars
)
for ev in self._store.eventsMatchingPredicate_(predicate):
if str(ev.calendarItemExternalIdentifier()) == uid:
return ev
return NoneCalendarStore.list_calendars method · python · L162-L193 (32 LOC)apple_calendar/eventkit.py
def list_calendars(self) -> list[dict]:
"""Return all calendars with name, type, source, and color."""
err = self._ensure_store()
if err:
return [err]
try:
calendars = self._store.calendarsForEntityType_(0) # 0 = Event
result = []
for cal in calendars:
color = cal.color()
color_hex = None
if color:
r = int(color.redComponent() * 255)
g = int(color.greenComponent() * 255)
b = int(color.blueComponent() * 255)
color_hex = f"#{r:02x}{g:02x}{b:02x}"
cal_type_int = cal.type()
cal_type_map = {0: "local", 1: "calDAV", 2: "exchange", 3: "subscription", 4: "birthday"}
cal_type = cal_type_map.get(cal_type_int, f"unknown({cal_type_int})")
result.append({
"name": str(cal.title()),
"tyCalendarStore.get_events method · python · L195-L229 (35 LOC)apple_calendar/eventkit.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
) -> list[dict]:
"""Return events in the given date range."""
err = self._ensure_store()
if err:
return [err]
try:
calendars = None
if calendar_names:
calendars = []
for name in calendar_names:
cal = self._get_calendar_by_name(name)
if cal:
calendars.append(cal)
if not calendars:
return [{"error": f"No matching calendars found for: {calendar_names}"}]
predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
_ns_date(start_dt), _ns_date(end_dt), calendars
)
events = self._store.eventsMatchingPredicate_(predicate)
if events is None:
return []
CalendarStore.create_event method · python · L231-L274 (44 LOC)apple_calendar/eventkit.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
) -> dict:
"""Create a calendar event and return its dict representation."""
err = self._ensure_store()
if err:
return err
try:
event = EventKit.EKEvent.eventWithEventStore_(self._store)
event.setTitle_(title)
event.setStartDate_(_ns_date(start_dt))
event.setEndDate_(_ns_date(end_dt))
event.setAllDay_(is_all_day)
if location:
event.setLocation_(location)
if notes:
event.setNotes_(notes)
if calendar_name:
cal = self._get_calendar_by_name(calendar_name)
if cal:
event.setCalendar_(cal)
CalendarStore.update_event method · python · L276-L308 (33 LOC)apple_calendar/eventkit.py
def update_event(self, event_uid: str, calendar_name: Optional[str] = None, **kwargs) -> dict:
"""Update an existing event by UID. Supported kwargs: title, start_dt,
end_dt, location, notes, is_all_day."""
err = self._ensure_store()
if err:
return err
try:
event = self._find_event_by_uid(event_uid, calendar_name)
if event is None:
return {"error": f"Event not found: {event_uid}"}
if "title" in kwargs:
event.setTitle_(kwargs["title"])
if "start_dt" in kwargs:
event.setStartDate_(_ns_date(kwargs["start_dt"]))
if "end_dt" in kwargs:
event.setEndDate_(_ns_date(kwargs["end_dt"]))
if "location" in kwargs:
event.setLocation_(kwargs["location"])
if "notes" in kwargs:
event.setNotes_(kwargs["notes"])
if "is_all_day" in kwargs:
event.CalendarStore.delete_event method · python · L310-L327 (18 LOC)apple_calendar/eventkit.py
def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
"""Delete an event by UID. Returns status dict on success, or an error dict."""
err = self._ensure_store()
if err:
return err
try:
event = self._find_event_by_uid(event_uid, calendar_name)
if event is None:
return {"error": f"Event not found: {event_uid}"}
success, error = self._store.removeEvent_span_error_(event, 0, None)
if not success:
return {"error": f"Failed to delete event: {error}"}
return {"status": "deleted", "event_uid": event_uid}
except (AttributeError, TypeError, RuntimeError) as e:
logger.error("PyObjC error deleting event: %s", e)
return {"error": f"Failed to delete event: {e}"}CalendarStore.search_events method · python · L329-L357 (29 LOC)apple_calendar/eventkit.py
def search_events(
self,
query: str,
start_dt: datetime,
end_dt: datetime,
) -> list[dict]:
"""Search events by title text within a date range."""
err = self._ensure_store()
if err:
return [err]
try:
predicate = self._store.predicateForEventsWithStartDate_endDate_calendars_(
_ns_date(start_dt), _ns_date(end_dt), None
)
events = self._store.eventsMatchingPredicate_(predicate)
if events is None:
return []
query_lower = query.lower()
results = []
for ev in events:
title = str(ev.title()) if ev.title() else ""
if query_lower in title.lower():
results.append(_event_to_dict(ev, str(ev.calendar().title())))
return results
except (AttributeError, TypeError, RuntimeError) as e:
logger.error("PyObjC error searching_escape_osascript function · python · L19-L27 (9 LOC)apple_mail/mail.py
def _escape_osascript(text: str) -> str:
"""Escape text for safe use in AppleScript strings."""
return (
text.replace("\\", "\\\\")
.replace('"', '\\"')
.replace("\n", "\\n")
.replace("\r", "\\r")
.replace("\t", "\\t")
)Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_run_applescript function · python · L30-L45 (16 LOC)apple_mail/mail.py
def _run_applescript(script: str, timeout: int = _DEFAULT_TIMEOUT) -> dict:
"""Execute AppleScript and return {'output': ...} or {'error': ...}."""
if not _IS_MACOS:
return _PLATFORM_ERROR
try:
result = subprocess.run(
["osascript", "-e", script],
capture_output=True, text=True, timeout=timeout, check=True,
)
return {"output": result.stdout.strip()}
except subprocess.TimeoutExpired:
return {"error": "Mail operation timed out"}
except subprocess.CalledProcessError as e:
return {"error": f"osascript failed: {e.stderr.strip()}"}
except FileNotFoundError:
return {"error": "osascript not found — not running on macOS?"}MailStore.list_mailboxes method · python · L65-L94 (30 LOC)apple_mail/mail.py
def list_mailboxes(self) -> list[dict]:
"""List all mailboxes across all accounts with unread counts."""
script = '''
tell application "Mail"
set output to ""
repeat with acct in accounts
set acctName to name of acct
repeat with mb in mailboxes of acct
set mbName to name of mb
set unread to unread count of mb
set output to output & mbName & "|||" & acctName & "|||" & (unread as text) & return & "~~~RECORD~~~" & return
end repeat
end repeat
return output
end tell
'''
result = _run_applescript(script)
if "error" in result:
return [result]
records = _parse_records(result["output"])
mailboxes = []
for rec in records:
fields = _parse_fields(rec)
if len(fields) >= 3:
mailboxes.append({
"name": fields[0],
"account": fields[1],
"unread_count": int(fiMailStore.get_messages method · python · L96-L144 (49 LOC)apple_mail/mail.py
def get_messages(self, mailbox: str = "INBOX", account: str = "", limit: int = 25) -> list[dict]:
"""Get recent messages (headers only) from a mailbox. Limit capped at 100."""
limit = min(max(1, limit), 100)
mailbox_esc = _escape_osascript(mailbox)
if account:
account_esc = _escape_osascript(account)
mailbox_ref = f'mailbox "{mailbox_esc}" of account "{account_esc}"'
else:
mailbox_ref = f'mailbox "{mailbox_esc}" of account 1'
script = f'''
tell application "Mail"
set mb to {mailbox_ref}
set msgCount to count of messages of mb
if msgCount is 0 then return ""
set maxMsg to msgCount
if maxMsg > {limit} then set maxMsg to {limit}
set output to ""
repeat with i from 1 to maxMsg
set msg to message i of mb
set mid to message id of msg
set subj to subject of msg
set sndr to sender of msg
set dt to date sent of msg as text
set isReadpage 1 / 11next ›