Function bodies 522 total
PlaywrightTeamsPoster._disconnect method · python · L58-L68 (11 LOC)browser/teams_poster.py
async def _disconnect(self) -> None:
"""Disconnect from browser (does NOT close the browser)."""
if self._pw is not None:
try:
await self._pw.stop()
except Exception:
pass
self._pw = None
self._page = None
self._compose = None
self._pending_message = NonePlaywrightTeamsPoster._find_compose_box method · python · L71-L87 (17 LOC)browser/teams_poster.py
async def _find_compose_box(page, timeout_ms: int = POST_TIMEOUT_MS):
"""Try each selector in :data:`COMPOSE_SELECTORS` with retries.
The Teams SPA takes several seconds to render after navigation.
Retries every 2s up to *timeout_ms* before giving up.
Returns the first locator that matches, or ``None``.
"""
elapsed = 0
interval_ms = 2_000
while elapsed < timeout_ms:
for selector in COMPOSE_SELECTORS:
locator = page.locator(selector)
if await locator.count() > 0:
return locator
await asyncio.sleep(interval_ms / 1_000)
elapsed += interval_ms
return NonePlaywrightTeamsPoster.prepare_message method · python · L89-L141 (53 LOC)browser/teams_poster.py
async def prepare_message(self, target: str, message: str) -> dict:
"""Phase 1: connect to browser, navigate to target, return confirmation.
Args:
target: Channel name or person name to search for in Teams.
message: The message text to post.
Returns a dict with ``"status"`` of ``"confirm_required"`` on
success (with ``"detected_channel"``, ``"message"``, and ``"target"``),
or ``"error"`` on failure.
"""
if not self._manager.is_alive():
return {
"status": "error",
"error": "Browser is not running. Call open_teams_browser first.",
}
# Clean up any leftover state from a previous prepare
if self.has_pending_message:
await self._disconnect()
try:
self._pw, browser = await self._manager.connect()
ctx = browser.contexts[0]
self._page = ctx.pages[0] if ctx.pages else await ctx.new_PlaywrightTeamsPoster.send_prepared_message method · python · L143-L179 (37 LOC)browser/teams_poster.py
async def send_prepared_message(self) -> dict:
"""Phase 2: type the pending message into the compose box and send.
Must be called after :meth:`prepare_message` returned
``"confirm_required"``. Disconnects from the browser after sending.
"""
if not self.has_pending_message:
return {
"status": "error",
"error": "No pending message. Call prepare_message first.",
}
try:
# Re-detect the channel in case user navigated during confirmation
detected = await TeamsNavigator._detect_channel_name(self._page)
await self._compose.click()
await self._compose.fill(self._pending_message)
await self._page.keyboard.press("Enter")
# Allow a moment for the message to dispatch
await self._page.wait_for_timeout(1_000)
result = {
"status": "sent",
"detected_channel": detecPlaywrightTeamsPoster.send_message method · python · L181-L190 (10 LOC)browser/teams_poster.py
async def send_message(self, target: str, message: str) -> dict:
"""One-shot: navigate to target, type message, send, disconnect.
Combines :meth:`prepare_message` and :meth:`send_prepared_message`
into a single call without requiring confirmation.
"""
result = await self.prepare_message(target, message)
if result["status"] != "confirm_required":
return result
return await self.send_prepared_message()PlaywrightTeamsPoster.cancel_prepared_message method · python · L192-L198 (7 LOC)browser/teams_poster.py
async def cancel_prepared_message(self) -> dict:
"""Cancel a prepared message and disconnect without sending."""
had_pending = self.has_pending_message
await self._disconnect()
if had_pending:
return {"status": "cancelled"}
return {"status": "error", "error": "No pending message to cancel."}get_capability_names function · python · L899-L905 (7 LOC)capabilities/registry.py
def get_capability_names(include_unimplemented: bool = True) -> list[str]:
"""Return known capability names sorted alphabetically."""
items = []
for name, definition in CAPABILITY_DEFINITIONS.items():
if include_unimplemented or definition.implemented:
items.append(name)
return sorted(items)Powered by Repobility — scan your code at https://repobility.com
validate_capabilities function · python · L908-L930 (23 LOC)capabilities/registry.py
def validate_capabilities(capabilities: Iterable[str] | None) -> list[str]:
"""Validate and normalize capability names.
Returns a de-duplicated list preserving first-seen order.
"""
if capabilities is None:
return []
seen: set[str] = set()
normalized: list[str] = []
for raw in capabilities:
name = (raw or "").strip()
if not name:
continue
if name not in CAPABILITY_DEFINITIONS:
valid = ", ".join(get_capability_names(include_unimplemented=True))
raise ValueError(f"Unknown capability '{name}'. Valid capabilities: {valid}")
if name not in seen:
normalized.append(name)
seen.add(name)
return normalizedget_tools_for_capabilities function · python · L939-L959 (21 LOC)capabilities/registry.py
def get_tools_for_capabilities(capabilities: Iterable[str] | None) -> list[dict]:
"""Return tool schemas for the given capabilities.
Capabilities without runtime tool mappings are treated as no-op and ignored.
"""
validated = validate_capabilities(capabilities)
tools: list[dict] = []
seen_tool_names: set[str] = set()
for capability_name in validated:
definition = CAPABILITY_DEFINITIONS[capability_name]
for tool_name in definition.tool_names:
if tool_name in seen_tool_names:
continue
schema = TOOL_SCHEMAS.get(tool_name)
if schema is None:
continue
tools.append(deepcopy(schema))
seen_tool_names.add(tool_name)
return toolscapability_prompt_lines function · python · L962-L969 (8 LOC)capabilities/registry.py
def capability_prompt_lines(include_unimplemented: bool = True) -> list[str]:
"""Return capability descriptions formatted for prompt text."""
lines: list[str] = []
for name in get_capability_names(include_unimplemented=include_unimplemented):
definition = CAPABILITY_DEFINITIONS[name]
suffix = "" if definition.implemented else " [legacy/no local tools]"
lines.append(f"{name}: {definition.description}{suffix}")
return linesIMessageAdapter.normalize method · python · L20-L32 (13 LOC)channels/adapter.py
def normalize(self, raw_event: dict) -> InboundEvent:
return InboundEvent(
channel="imessage",
source=raw_event.get("sender", ""),
event_type="message",
content=raw_event.get("text", ""),
metadata={
"is_from_me": raw_event.get("is_from_me", False),
"chat_identifier": raw_event.get("chat_identifier", ""),
},
received_at=raw_event.get("date_local", ""),
raw_id=raw_event.get("guid", ""),
)MailAdapter.normalize method · python · L38-L57 (20 LOC)channels/adapter.py
def normalize(self, raw_event: dict) -> InboundEvent:
# Full message (has 'body') vs header-only (has 'subject' only)
content = raw_event.get("body", raw_event.get("subject", ""))
return InboundEvent(
channel="mail",
source=raw_event.get("sender", ""),
event_type="email",
content=content,
metadata={
"subject": raw_event.get("subject", ""),
"read": raw_event.get("read", False),
"flagged": raw_event.get("flagged", False),
"mailbox": raw_event.get("mailbox", ""),
"account": raw_event.get("account", ""),
"to": raw_event.get("to", []),
"cc": raw_event.get("cc", []),
},
received_at=raw_event.get("date", ""),
raw_id=raw_event.get("message_id", ""),
)WebhookAdapter.normalize method · python · L63-L75 (13 LOC)channels/adapter.py
def normalize(self, raw_event: dict) -> InboundEvent:
return InboundEvent(
channel="webhook",
source=raw_event.get("source", ""),
event_type="webhook_event",
content=raw_event.get("payload", ""),
metadata={
"status": raw_event.get("status", ""),
"event_type": raw_event.get("event_type", ""),
},
received_at=raw_event.get("received_at", ""),
raw_id=str(raw_event.get("id", "")),
)adapt_event function · python · L85-L98 (14 LOC)channels/adapter.py
def adapt_event(channel: str, raw_event: dict) -> InboundEvent:
"""Factory function: normalize a raw event dict using the appropriate adapter.
Args:
channel: One of "imessage", "mail", "webhook"
raw_event: Channel-specific event dict
Raises:
ValueError: If channel is not recognized
"""
adapter = _ADAPTERS.get(channel)
if adapter is None:
raise ValueError(f"Unknown channel: {channel!r}. Must be one of: {sorted(_ADAPTERS)}")
return adapter.normalize(raw_event)log_event_handler function · python · L8-L18 (11 LOC)channels/consumers.py
def log_event_handler(event: InboundEvent) -> dict:
"""Return a summary dict suitable for logging/audit trail."""
return {
"action": "logged",
"channel": event.channel,
"source": event.source,
"event_type": event.event_type,
"received_at": event.received_at,
"raw_id": event.raw_id,
"content_preview": event.content[:120] if event.content else "",
}All rows scored by the Repobility analyzer (https://repobility.com)
priority_filter function · python · L21-L34 (14 LOC)channels/consumers.py
def priority_filter(event: InboundEvent) -> dict:
"""Flag events whose content contains urgent keywords.
Returns a dict with 'is_priority' bool and 'matched_keywords' list.
"""
content_lower = event.content.lower() if event.content else ""
matched = [kw for kw in _URGENT_KEYWORDS if kw in content_lower]
return {
"action": "priority_check",
"is_priority": bool(matched),
"matched_keywords": sorted(matched),
"channel": event.channel,
"raw_id": event.raw_id,
}EventRouter.register_handler method · python · L23-L31 (9 LOC)channels/router.py
def register_handler(self, event_type: str, handler: Callable[[InboundEvent], dict]) -> None:
"""Register a handler for a specific event type.
Args:
event_type: The event type to handle (e.g. "message", "email", "webhook_event")
handler: Callable that takes an InboundEvent and returns a dict result
"""
with self._lock:
self._handlers[event_type].append(handler)EventRouter.route method · python · L33-L50 (18 LOC)channels/router.py
def route(self, event: InboundEvent) -> list[dict]:
"""Route an event to all registered handlers for its event_type.
Returns a list of result dicts, one per handler. If a handler raises,
the error is caught and included as an error dict in the results.
"""
with self._lock:
handlers = list(self._handlers.get(event.event_type, []))
results = []
for handler in handlers:
try:
result = handler(event)
results.append(result)
except Exception as exc:
logger.warning("Handler %s failed for event %s: %s", handler.__name__, event.raw_id, exc)
results.append({"error": str(exc), "handler": handler.__name__})
return resultsis_sensitive_topic function · python · L98-L112 (15 LOC)channels/routing.py
def is_sensitive_topic(content: str) -> bool:
"""Detect whether message content contains sensitive topics.
Uses keyword matching (case-insensitive, word-boundary aware) to flag
messages that may require higher safety tiers.
Args:
content: The message body text to scan.
Returns:
True if any sensitive keyword is found in the content.
"""
if not content:
return False
return bool(_SENSITIVE_PATTERN.search(content))is_work_hours function · python · L124-L140 (17 LOC)channels/routing.py
def is_work_hours(dt: Optional[datetime] = None) -> bool:
"""Check whether the given datetime falls within standard work hours.
Work hours are defined as Monday-Friday, 09:00-17:59 local time.
Args:
dt: The datetime to check. Defaults to ``datetime.now()`` if not provided.
Returns:
True if the datetime is within work hours.
"""
if dt is None:
dt = datetime.now()
# weekday(): Monday=0 .. Sunday=6
if dt.weekday() >= 5:
return False
return _WORK_START_HOUR <= dt.hour < _WORK_END_HOURdetermine_safety_tier function · python · L148-L200 (53 LOC)channels/routing.py
def determine_safety_tier(
recipient_type: str,
sensitive: bool = False,
first_contact: bool = False,
override: Optional[str] = None,
) -> SafetyTier:
"""Determine the safety tier for an outbound message.
Logic:
1. If ``override`` is provided, return the corresponding tier immediately.
2. Start with the baseline tier for the recipient type.
3. If ``sensitive`` is True, bump up one tier (capped at DRAFT_ONLY).
4. If ``first_contact`` is True and recipient is not "self", set to DRAFT_ONLY.
Args:
recipient_type: One of "self", "internal", "external".
sensitive: Whether the message content is sensitive.
first_contact: Whether this is the first message to this recipient.
override: Force a specific tier: "auto", "confirm", or "draft_only".
Returns:
The determined SafetyTier.
Raises:
ValueError: If ``recipient_type`` or ``override`` is invalid.
"""
# Validate override first
if oselect_channel function · python · L208-L281 (74 LOC)channels/routing.py
def select_channel(
recipient_type: str,
urgency: str,
work_hours: Optional[bool] = None,
) -> str:
"""Select the best outbound channel for a message.
Channel selection matrix:
**Self:**
- urgent -> imessage
- informational -> email
- ephemeral -> notification
- (other) -> email
**Internal (work hours):**
- informal -> teams
- formal -> email
- urgent -> teams
- (other) -> email
**Internal (off hours):**
- urgent -> imessage
- (other) -> queued (deferred until work hours)
**External:**
- (always) -> email
Args:
recipient_type: One of "self", "internal", "external".
urgency: Message urgency/style: "urgent", "informational", "ephemeral",
"informal", "formal".
work_hours: Whether it is currently work hours. If None, auto-detected
via ``is_work_hours()``.
Returns:
Channel name string: "imessage", "email", "notification", "teams"parse_local_date_to_epoch function · python · L29-L35 (7 LOC)chief/imessage_daemon.py
def parse_local_date_to_epoch(date_local: str) -> int:
"""Parse `YYYY-MM-DD HH:MM:SS` in local tz to epoch seconds."""
dt = datetime.strptime(date_local, "%Y-%m-%d %H:%M:%S")
local_tz = datetime.now().astimezone().tzinfo
if local_tz is None:
raise ValueError("Could not resolve local timezone")
return int(dt.replace(tzinfo=local_tz).timestamp())Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
compute_lookback_minutes function · python · L38-L52 (15 LOC)chief/imessage_daemon.py
def compute_lookback_minutes(
watermark_epoch: int,
now_epoch: int,
bootstrap_minutes: int,
max_minutes: int,
) -> int:
if watermark_epoch <= 0:
return bootstrap_minutes
age_seconds = max(0, now_epoch - watermark_epoch)
minutes = int(age_seconds / 60) + 2
if minutes < 1:
minutes = 1
if minutes > max_minutes:
minutes = max_minutes
return minutescompute_dispatch_lookback_minutes function · python · L55-L68 (14 LOC)chief/imessage_daemon.py
def compute_dispatch_lookback_minutes(
oldest_queued_epoch: int,
now_epoch: int,
max_minutes: int,
) -> int:
if oldest_queued_epoch <= 0:
return 5
age_seconds = max(0, now_epoch - oldest_queued_epoch)
minutes = int(age_seconds / 60) + 3
if minutes < 1:
minutes = 1
if minutes > max_minutes:
minutes = max_minutes
return minutesStateStore.__init__ method · python · L95-L100 (6 LOC)chief/imessage_daemon.py
def __init__(self, db_path: Path):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
self._init_schema()StateStore._init_schema method · python · L105-L145 (41 LOC)chief/imessage_daemon.py
def _init_schema(self) -> None:
self.conn.executescript(
"""
CREATE TABLE IF NOT EXISTS message_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
guid TEXT NOT NULL UNIQUE,
text TEXT NOT NULL,
date_local TEXT NOT NULL,
timestamp_epoch INTEGER NOT NULL,
raw_json TEXT NOT NULL,
created_at_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS processing_jobs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
message_guid TEXT NOT NULL UNIQUE REFERENCES message_events(guid),
status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'succeeded', 'failed')),
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at_utc TEXT NOT NULL,
updated_at_utc TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS outbounStateStore.get_watermark_epoch method · python · L147-L156 (10 LOC)chief/imessage_daemon.py
def get_watermark_epoch(self) -> int:
row = self.conn.execute(
"SELECT value FROM watermarks WHERE key = 'last_message_epoch'"
).fetchone()
if not row:
return 0
try:
return int(row["value"])
except (TypeError, ValueError):
return 0StateStore.set_watermark_epoch method · python · L158-L169 (12 LOC)chief/imessage_daemon.py
def set_watermark_epoch(self, epoch: int) -> None:
self.conn.execute(
"""
INSERT INTO watermarks(key, value, updated_at_utc)
VALUES('last_message_epoch', ?, ?)
ON CONFLICT(key) DO UPDATE SET
value = excluded.value,
updated_at_utc = excluded.updated_at_utc
""",
(str(epoch), utc_now_iso()),
)
self.conn.commit()StateStore.ingest_messages method · python · L171-L197 (27 LOC)chief/imessage_daemon.py
def ingest_messages(self, messages: list[IngestedMessage]) -> tuple[int, int]:
inserted = 0
max_epoch = 0
now = utc_now_iso()
for msg in messages:
cur = self.conn.execute(
"""
INSERT OR IGNORE INTO message_events
(guid, text, date_local, timestamp_epoch, raw_json, created_at_utc)
VALUES (?, ?, ?, ?, ?, ?)
""",
(msg.guid, msg.text, msg.date_local, msg.timestamp_epoch, msg.raw_json, now),
)
if cur.rowcount == 1:
inserted += 1
self.conn.execute(
"""
INSERT OR IGNORE INTO processing_jobs
(message_guid, status, attempts, last_error, created_at_utc, updated_at_utc)
VALUES (?, 'queued', 0, NULL, ?, ?)
""",
(msg.guid, now, now),
)
if msg.timestamp_epocStateStore.list_queued_jobs method · python · L199-L218 (20 LOC)chief/imessage_daemon.py
def list_queued_jobs(self, limit: int) -> list[dict[str, Any]]:
rows = self.conn.execute(
"""
SELECT
j.id,
j.message_guid,
j.status,
j.attempts,
e.timestamp_epoch,
e.date_local,
e.text
FROM processing_jobs j
JOIN message_events e ON e.guid = j.message_guid
WHERE j.status = 'queued'
ORDER BY e.timestamp_epoch ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(row) for row in rows]Repobility · open methodology · https://repobility.com/research/
StateStore.mark_jobs_running method · python · L220-L237 (18 LOC)chief/imessage_daemon.py
def mark_jobs_running(self, job_ids: list[int]) -> None:
if not job_ids:
return
now = utc_now_iso()
placeholders = ",".join("?" for _ in job_ids)
params: list[Any] = [now]
params.extend(job_ids)
self.conn.execute(
f"""
UPDATE processing_jobs
SET status = 'running',
attempts = attempts + 1,
updated_at_utc = ?
WHERE id IN ({placeholders})
""",
tuple(params),
)
self.conn.commit()StateStore.mark_job_result method · python · L239-L251 (13 LOC)chief/imessage_daemon.py
def mark_job_result(self, message_guid: str, success: bool, error: str = "") -> None:
status = "succeeded" if success else "failed"
now = utc_now_iso()
err_val: Any = None if success else error[:1000]
self.conn.execute(
"""
UPDATE processing_jobs
SET status = ?, last_error = ?, updated_at_utc = ?
WHERE message_guid = ?
""",
(status, err_val, now, message_guid),
)
self.conn.commit()StateStore.count_jobs_by_status method · python · L253-L258 (6 LOC)chief/imessage_daemon.py
def count_jobs_by_status(self, status: str) -> int:
row = self.conn.execute(
"SELECT COUNT(*) AS c FROM processing_jobs WHERE status = ?",
(status,),
).fetchone()
return int(row["c"]) if row else 0IMessageDaemon.run_forever method · python · L275-L294 (20 LOC)chief/imessage_daemon.py
def run_forever(self) -> None:
logger.info(
"Starting iMessage daemon (poll=%ss, db=%s)",
self.config.poll_interval_seconds,
self.config.state_db_path,
)
while True:
try:
result = self.run_once()
if result["ingested"] > 0 or result["dispatched"] > 0:
logger.info(
"Cycle complete: ingested=%d dispatched=%d queued=%d failed=%d",
result["ingested"],
result["dispatched"],
self.store.count_jobs_by_status("queued"),
self.store.count_jobs_by_status("failed"),
)
except Exception:
logger.exception("Daemon cycle failed")
time.sleep(self.config.poll_interval_seconds)IMessageDaemon._ingest_cycle method · python · L296-L341 (46 LOC)chief/imessage_daemon.py
def _ingest_cycle(self) -> int:
now_epoch = int(time.time())
watermark = self.store.get_watermark_epoch()
lookback = compute_lookback_minutes(
watermark_epoch=watermark,
now_epoch=now_epoch,
bootstrap_minutes=self.config.bootstrap_lookback_minutes,
max_minutes=self.config.max_lookback_minutes,
)
cmd = [str(self.config.reader_path), "--minutes", str(lookback)]
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
stderr = proc.stderr.strip()
logger.error("imessage-reader failed (code=%d): %s", proc.returncode, stderr)
return 0
raw = proc.stdout.strip() or "[]"
parsed = json.loads(raw)
if not isinstance(parsed, list):
raise ValueError("imessage-reader output was not a JSON array")
messages: list[IngestedMessage] = []
for row in parsed:
ifIMessageDaemon._dispatch_cycle method · python · L343-L379 (37 LOC)chief/imessage_daemon.py
def _dispatch_cycle(self) -> int:
queued = self.store.list_queued_jobs(limit=self.config.dispatch_batch_size)
if not queued:
return 0
job_ids = [int(row["id"]) for row in queued]
guids = [str(row["message_guid"]) for row in queued]
oldest_epoch = min(int(row["timestamp_epoch"]) for row in queued)
lookback = compute_dispatch_lookback_minutes(
oldest_queued_epoch=oldest_epoch,
now_epoch=int(time.time()),
max_minutes=self.config.max_lookback_minutes,
)
self.store.mark_jobs_running(job_ids)
cmd = [str(self.config.inbox_monitor_path), "--interval", str(lookback)]
proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
if proc.returncode != 0:
err = (proc.stderr or proc.stdout or "inbox-monitor failed").strip()
for guid in guids:
self.store.mark_job_result(guid, success=False, error=err)
IMessageDaemon._load_processed_ids method · python · L381-L392 (12 LOC)chief/imessage_daemon.py
def _load_processed_ids(self) -> set[str]:
if not self.config.processed_file.exists():
return set()
try:
payload = json.loads(self.config.processed_file.read_text(encoding="utf-8"))
except json.JSONDecodeError:
logger.warning("Processed file is not valid JSON: %s", self.config.processed_file)
return set()
values = payload.get("processed_ids", [])
if not isinstance(values, list):
return set()
return {str(v) for v in values}UnifiedCalendarService.__init__ method · python · L14-L24 (11 LOC)connectors/calendar_unified.py
def __init__(
self,
router: ProviderRouter,
ownership_db_path: Path,
require_all_read_providers_success: bool = True,
):
self.router = router
self.ownership_db_path = Path(ownership_db_path)
self.require_all_read_providers_success = bool(require_all_read_providers_success)
self.ownership_db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_ownership_db()Powered by Repobility — scan your code at https://repobility.com
UnifiedCalendarService._init_ownership_db method · python · L35-L50 (16 LOC)connectors/calendar_unified.py
def _init_ownership_db(self) -> None:
with self._open_ownership_db() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS event_ownership (
unified_uid TEXT PRIMARY KEY,
provider TEXT NOT NULL,
native_id TEXT NOT NULL,
calendar_name TEXT,
updated_at_utc TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_event_ownership_native
ON event_ownership(native_id, updated_at_utc DESC);
"""
)
conn.commit()UnifiedCalendarService._upsert_ownership method · python · L52-L72 (21 LOC)connectors/calendar_unified.py
def _upsert_ownership(self, event: dict) -> None:
unified_uid = str(event.get("unified_uid", "")).strip()
provider = normalize_provider_name(str(event.get("provider", "")))
native_id = str(event.get("native_id", "")).strip()
calendar_name = str(event.get("calendar", "") or event.get("calendar_id", "")).strip()
if not unified_uid or not provider or not native_id:
return
with self._open_ownership_db() as conn:
conn.execute(
"""
INSERT INTO event_ownership(unified_uid, provider, native_id, calendar_name, updated_at_utc)
VALUES(?, ?, ?, ?, ?)
ON CONFLICT(unified_uid) DO UPDATE SET
provider=excluded.provider,
native_id=excluded.native_id,
calendar_name=excluded.calendar_name,
updated_at_utc=excluded.updated_at_utc
""",
(unified_uid, provider, UnifiedCalendarService._delete_ownership method · python · L74-L79 (6 LOC)connectors/calendar_unified.py
def _delete_ownership(self, unified_uid: str) -> None:
if not unified_uid:
return
with self._open_ownership_db() as conn:
conn.execute("DELETE FROM event_ownership WHERE unified_uid = ?", (unified_uid,))
conn.commit()UnifiedCalendarService._lookup_ownership method · python · L81-L105 (25 LOC)connectors/calendar_unified.py
def _lookup_ownership(self, event_uid: str) -> tuple[str, str] | None:
event_uid = (event_uid or "").strip()
if not event_uid:
return None
with self._open_ownership_db() as conn:
row = conn.execute(
"SELECT provider, native_id FROM event_ownership WHERE unified_uid = ?",
(event_uid,),
).fetchone()
if row:
return str(row["provider"]), str(row["native_id"])
row = conn.execute(
"""
SELECT provider, native_id
FROM event_ownership
WHERE native_id = ?
ORDER BY updated_at_utc DESC
LIMIT 1
""",
(event_uid,),
).fetchone()
if row:
return str(row["provider"]), str(row["native_id"])
return NoneUnifiedCalendarService._tag_event method · python · L107-L120 (14 LOC)connectors/calendar_unified.py
def _tag_event(self, event: dict, provider_name: str) -> dict:
tagged = dict(event)
provider = normalize_provider_name(provider_name) or provider_name
native_id = str(tagged.get("native_id", "") or tagged.get("uid", "")).strip()
if native_id:
tagged["native_id"] = native_id
tagged["provider"] = provider
if not tagged.get("calendar_id"):
tagged["calendar_id"] = str(tagged.get("calendar", "") or "")
if not tagged.get("source_account"):
tagged["source_account"] = str(tagged.get("source", "") or "")
if not tagged.get("unified_uid"):
tagged["unified_uid"] = f"{provider}:{native_id}" if native_id else ""
return taggedUnifiedCalendarService._is_error_payload method · python · L123-L128 (6 LOC)connectors/calendar_unified.py
def _is_error_payload(payload: object) -> bool:
if isinstance(payload, dict):
return bool(payload.get("error"))
if isinstance(payload, list) and payload and isinstance(payload[0], dict):
return bool(payload[0].get("error"))
return FalseUnifiedCalendarService._build_dual_read_error method · python · L131-L140 (10 LOC)connectors/calendar_unified.py
def _build_dual_read_error(required: list[str], succeeded: list[str], rows: list[dict], errors: list[dict]) -> dict:
failed = [p for p in required if p not in succeeded]
return {
"error": "Dual-read policy requires all connected providers to succeed",
"providers_required": required,
"providers_succeeded": succeeded,
"providers_failed": failed,
"partial_results": rows,
"provider_errors": errors,
}UnifiedCalendarService._filter_source method · python · L142-L158 (17 LOC)connectors/calendar_unified.py
def _filter_source(self, rows: list[dict], source_filter: str) -> list[dict]:
needle = (source_filter or "").strip().lower()
if not needle:
return rows
filtered: list[dict] = []
for row in rows:
haystack = " ".join(
[
str(row.get("provider", "") or ""),
str(row.get("source_account", "") or ""),
str(row.get("calendar", "") or ""),
str(row.get("calendar_id", "") or ""),
]
).lower()
if needle in haystack:
filtered.append(row)
return filteredAll rows scored by the Repobility analyzer (https://repobility.com)
UnifiedCalendarService._event_dedupe_key method · python · L161-L168 (8 LOC)connectors/calendar_unified.py
def _event_dedupe_key(event: dict) -> tuple:
ical_uid = str(event.get("ical_uid", "")).strip()
if ical_uid:
return ("ical_uid", ical_uid.lower())
title = str(event.get("title", "")).strip().lower()
start = str(event.get("start", "")).strip()
end = str(event.get("end", "")).strip()
return ("fallback", title, start, end)UnifiedCalendarService._dedupe_events method · python · L170-L179 (10 LOC)connectors/calendar_unified.py
def _dedupe_events(self, rows: list[dict]) -> list[dict]:
seen: dict[tuple, dict] = {}
for row in rows:
key = self._event_dedupe_key(row)
if key in seen:
continue
seen[key] = row
deduped = list(seen.values())
deduped.sort(key=lambda r: str(r.get("start", "")))
return dedupedUnifiedCalendarService._provider_from_prefixed_uid method · python · L182-L190 (9 LOC)connectors/calendar_unified.py
def _provider_from_prefixed_uid(event_uid: str) -> tuple[str, str] | None:
uid = (event_uid or "").strip()
if ":" not in uid:
return None
provider_token, native_id = uid.split(":", 1)
provider = normalize_provider_name(provider_token)
if not provider or not native_id:
return None
return provider, native_id