← back to jrichyrich__chief-of-staff

Function bodies 522 total

All specs Real LLM only Function bodies
PlaywrightTeamsPoster._disconnect method · python · L58-L68 (11 LOC)
browser/teams_poster.py
    async def _disconnect(self) -> None:
        """Disconnect from browser (does NOT close the browser)."""
        if self._pw is not None:
            try:
                await self._pw.stop()
            except Exception:
                pass
        self._pw = None
        self._page = None
        self._compose = None
        self._pending_message = None
PlaywrightTeamsPoster._find_compose_box method · python · L71-L87 (17 LOC)
browser/teams_poster.py
    async def _find_compose_box(page, timeout_ms: int = POST_TIMEOUT_MS):
        """Try each selector in :data:`COMPOSE_SELECTORS` with retries.

        The Teams SPA takes several seconds to render after navigation.
        Retries every 2s up to *timeout_ms* before giving up.
        Returns the first locator that matches, or ``None``.
        """
        elapsed = 0
        interval_ms = 2_000
        while elapsed < timeout_ms:
            for selector in COMPOSE_SELECTORS:
                locator = page.locator(selector)
                if await locator.count() > 0:
                    return locator
            await asyncio.sleep(interval_ms / 1_000)
            elapsed += interval_ms
        return None
PlaywrightTeamsPoster.prepare_message method · python · L89-L141 (53 LOC)
browser/teams_poster.py
    async def prepare_message(self, target: str, message: str) -> dict:
        """Phase 1: connect to browser, navigate to target, return confirmation.

        Args:
            target: Channel name or person name to search for in Teams.
            message: The message text to post.

        Returns a dict with ``"status"`` of ``"confirm_required"`` on
        success (with ``"detected_channel"``, ``"message"``, and ``"target"``),
        or ``"error"`` on failure.
        """
        if not self._manager.is_alive():
            return {
                "status": "error",
                "error": "Browser is not running. Call open_teams_browser first.",
            }

        # Clean up any leftover state from a previous prepare
        if self.has_pending_message:
            await self._disconnect()

        try:
            self._pw, browser = await self._manager.connect()
            ctx = browser.contexts[0]
            self._page = ctx.pages[0] if ctx.pages else await ctx.new_
PlaywrightTeamsPoster.send_prepared_message method · python · L143-L179 (37 LOC)
browser/teams_poster.py
    async def send_prepared_message(self) -> dict:
        """Phase 2: type the pending message into the compose box and send.

        Must be called after :meth:`prepare_message` returned
        ``"confirm_required"``.  Disconnects from the browser after sending.
        """
        if not self.has_pending_message:
            return {
                "status": "error",
                "error": "No pending message. Call prepare_message first.",
            }

        try:
            # Re-detect the channel in case user navigated during confirmation
            detected = await TeamsNavigator._detect_channel_name(self._page)

            await self._compose.click()
            await self._compose.fill(self._pending_message)
            await self._page.keyboard.press("Enter")

            # Allow a moment for the message to dispatch
            await self._page.wait_for_timeout(1_000)

            result = {
                "status": "sent",
                "detected_channel": detec
PlaywrightTeamsPoster.send_message method · python · L181-L190 (10 LOC)
browser/teams_poster.py
    async def send_message(self, target: str, message: str) -> dict:
        """One-shot: navigate to target, type message, send, disconnect.

        Combines :meth:`prepare_message` and :meth:`send_prepared_message`
        into a single call without requiring confirmation.
        """
        result = await self.prepare_message(target, message)
        if result["status"] != "confirm_required":
            return result
        return await self.send_prepared_message()
PlaywrightTeamsPoster.cancel_prepared_message method · python · L192-L198 (7 LOC)
browser/teams_poster.py
    async def cancel_prepared_message(self) -> dict:
        """Cancel a prepared message and disconnect without sending."""
        had_pending = self.has_pending_message
        await self._disconnect()
        if had_pending:
            return {"status": "cancelled"}
        return {"status": "error", "error": "No pending message to cancel."}
get_capability_names function · python · L899-L905 (7 LOC)
capabilities/registry.py
def get_capability_names(include_unimplemented: bool = True) -> list[str]:
    """Return known capability names sorted alphabetically."""
    items = []
    for name, definition in CAPABILITY_DEFINITIONS.items():
        if include_unimplemented or definition.implemented:
            items.append(name)
    return sorted(items)
Powered by Repobility — scan your code at https://repobility.com
validate_capabilities function · python · L908-L930 (23 LOC)
capabilities/registry.py
def validate_capabilities(capabilities: Iterable[str] | None) -> list[str]:
    """Validate and normalize capability names.

    Returns a de-duplicated list preserving first-seen order.
    """
    if capabilities is None:
        return []

    seen: set[str] = set()
    normalized: list[str] = []

    for raw in capabilities:
        name = (raw or "").strip()
        if not name:
            continue
        if name not in CAPABILITY_DEFINITIONS:
            valid = ", ".join(get_capability_names(include_unimplemented=True))
            raise ValueError(f"Unknown capability '{name}'. Valid capabilities: {valid}")
        if name not in seen:
            normalized.append(name)
            seen.add(name)

    return normalized
get_tools_for_capabilities function · python · L939-L959 (21 LOC)
capabilities/registry.py
def get_tools_for_capabilities(capabilities: Iterable[str] | None) -> list[dict]:
    """Return tool schemas for the given capabilities.

    Capabilities without runtime tool mappings are treated as no-op and ignored.
    """
    validated = validate_capabilities(capabilities)
    tools: list[dict] = []
    seen_tool_names: set[str] = set()

    for capability_name in validated:
        definition = CAPABILITY_DEFINITIONS[capability_name]
        for tool_name in definition.tool_names:
            if tool_name in seen_tool_names:
                continue
            schema = TOOL_SCHEMAS.get(tool_name)
            if schema is None:
                continue
            tools.append(deepcopy(schema))
            seen_tool_names.add(tool_name)

    return tools
capability_prompt_lines function · python · L962-L969 (8 LOC)
capabilities/registry.py
def capability_prompt_lines(include_unimplemented: bool = True) -> list[str]:
    """Return capability descriptions formatted for prompt text."""
    lines: list[str] = []
    for name in get_capability_names(include_unimplemented=include_unimplemented):
        definition = CAPABILITY_DEFINITIONS[name]
        suffix = "" if definition.implemented else " [legacy/no local tools]"
        lines.append(f"{name}: {definition.description}{suffix}")
    return lines
IMessageAdapter.normalize method · python · L20-L32 (13 LOC)
channels/adapter.py
    def normalize(self, raw_event: dict) -> InboundEvent:
        return InboundEvent(
            channel="imessage",
            source=raw_event.get("sender", ""),
            event_type="message",
            content=raw_event.get("text", ""),
            metadata={
                "is_from_me": raw_event.get("is_from_me", False),
                "chat_identifier": raw_event.get("chat_identifier", ""),
            },
            received_at=raw_event.get("date_local", ""),
            raw_id=raw_event.get("guid", ""),
        )
MailAdapter.normalize method · python · L38-L57 (20 LOC)
channels/adapter.py
    def normalize(self, raw_event: dict) -> InboundEvent:
        # Full message (has 'body') vs header-only (has 'subject' only)
        content = raw_event.get("body", raw_event.get("subject", ""))
        return InboundEvent(
            channel="mail",
            source=raw_event.get("sender", ""),
            event_type="email",
            content=content,
            metadata={
                "subject": raw_event.get("subject", ""),
                "read": raw_event.get("read", False),
                "flagged": raw_event.get("flagged", False),
                "mailbox": raw_event.get("mailbox", ""),
                "account": raw_event.get("account", ""),
                "to": raw_event.get("to", []),
                "cc": raw_event.get("cc", []),
            },
            received_at=raw_event.get("date", ""),
            raw_id=raw_event.get("message_id", ""),
        )
WebhookAdapter.normalize method · python · L63-L75 (13 LOC)
channels/adapter.py
    def normalize(self, raw_event: dict) -> InboundEvent:
        return InboundEvent(
            channel="webhook",
            source=raw_event.get("source", ""),
            event_type="webhook_event",
            content=raw_event.get("payload", ""),
            metadata={
                "status": raw_event.get("status", ""),
                "event_type": raw_event.get("event_type", ""),
            },
            received_at=raw_event.get("received_at", ""),
            raw_id=str(raw_event.get("id", "")),
        )
adapt_event function · python · L85-L98 (14 LOC)
channels/adapter.py
def adapt_event(channel: str, raw_event: dict) -> InboundEvent:
    """Factory function: normalize a raw event dict using the appropriate adapter.

    Args:
        channel: One of "imessage", "mail", "webhook"
        raw_event: Channel-specific event dict

    Raises:
        ValueError: If channel is not recognized
    """
    adapter = _ADAPTERS.get(channel)
    if adapter is None:
        raise ValueError(f"Unknown channel: {channel!r}. Must be one of: {sorted(_ADAPTERS)}")
    return adapter.normalize(raw_event)
log_event_handler function · python · L8-L18 (11 LOC)
channels/consumers.py
def log_event_handler(event: InboundEvent) -> dict:
    """Return a summary dict suitable for logging/audit trail."""
    return {
        "action": "logged",
        "channel": event.channel,
        "source": event.source,
        "event_type": event.event_type,
        "received_at": event.received_at,
        "raw_id": event.raw_id,
        "content_preview": event.content[:120] if event.content else "",
    }
All rows scored by the Repobility analyzer (https://repobility.com)
priority_filter function · python · L21-L34 (14 LOC)
channels/consumers.py
def priority_filter(event: InboundEvent) -> dict:
    """Flag events whose content contains urgent keywords.

    Returns a dict with 'is_priority' bool and 'matched_keywords' list.
    """
    content_lower = event.content.lower() if event.content else ""
    matched = [kw for kw in _URGENT_KEYWORDS if kw in content_lower]
    return {
        "action": "priority_check",
        "is_priority": bool(matched),
        "matched_keywords": sorted(matched),
        "channel": event.channel,
        "raw_id": event.raw_id,
    }
EventRouter.register_handler method · python · L23-L31 (9 LOC)
channels/router.py
    def register_handler(self, event_type: str, handler: Callable[[InboundEvent], dict]) -> None:
        """Register a handler for a specific event type.

        Args:
            event_type: The event type to handle (e.g. "message", "email", "webhook_event")
            handler: Callable that takes an InboundEvent and returns a dict result
        """
        with self._lock:
            self._handlers[event_type].append(handler)
EventRouter.route method · python · L33-L50 (18 LOC)
channels/router.py
    def route(self, event: InboundEvent) -> list[dict]:
        """Route an event to all registered handlers for its event_type.

        Returns a list of result dicts, one per handler. If a handler raises,
        the error is caught and included as an error dict in the results.
        """
        with self._lock:
            handlers = list(self._handlers.get(event.event_type, []))

        results = []
        for handler in handlers:
            try:
                result = handler(event)
                results.append(result)
            except Exception as exc:
                logger.warning("Handler %s failed for event %s: %s", handler.__name__, event.raw_id, exc)
                results.append({"error": str(exc), "handler": handler.__name__})
        return results
is_sensitive_topic function · python · L98-L112 (15 LOC)
channels/routing.py
def is_sensitive_topic(content: str) -> bool:
    """Detect whether message content contains sensitive topics.

    Uses keyword matching (case-insensitive, word-boundary aware) to flag
    messages that may require higher safety tiers.

    Args:
        content: The message body text to scan.

    Returns:
        True if any sensitive keyword is found in the content.
    """
    if not content:
        return False
    return bool(_SENSITIVE_PATTERN.search(content))
is_work_hours function · python · L124-L140 (17 LOC)
channels/routing.py
def is_work_hours(dt: Optional[datetime] = None) -> bool:
    """Check whether the given datetime falls within standard work hours.

    Work hours are defined as Monday-Friday, 09:00-17:59 local time.

    Args:
        dt: The datetime to check. Defaults to ``datetime.now()`` if not provided.

    Returns:
        True if the datetime is within work hours.
    """
    if dt is None:
        dt = datetime.now()
    # weekday(): Monday=0 .. Sunday=6
    if dt.weekday() >= 5:
        return False
    return _WORK_START_HOUR <= dt.hour < _WORK_END_HOUR
determine_safety_tier function · python · L148-L200 (53 LOC)
channels/routing.py
def determine_safety_tier(
    recipient_type: str,
    sensitive: bool = False,
    first_contact: bool = False,
    override: Optional[str] = None,
) -> SafetyTier:
    """Determine the safety tier for an outbound message.

    Logic:
    1. If ``override`` is provided, return the corresponding tier immediately.
    2. Start with the baseline tier for the recipient type.
    3. If ``sensitive`` is True, bump up one tier (capped at DRAFT_ONLY).
    4. If ``first_contact`` is True and recipient is not "self", set to DRAFT_ONLY.

    Args:
        recipient_type: One of "self", "internal", "external".
        sensitive: Whether the message content is sensitive.
        first_contact: Whether this is the first message to this recipient.
        override: Force a specific tier: "auto", "confirm", or "draft_only".

    Returns:
        The determined SafetyTier.

    Raises:
        ValueError: If ``recipient_type`` or ``override`` is invalid.
    """
    # Validate override first
    if o
select_channel function · python · L208-L281 (74 LOC)
channels/routing.py
def select_channel(
    recipient_type: str,
    urgency: str,
    work_hours: Optional[bool] = None,
) -> str:
    """Select the best outbound channel for a message.

    Channel selection matrix:

    **Self:**
    - urgent -> imessage
    - informational -> email
    - ephemeral -> notification
    - (other) -> email

    **Internal (work hours):**
    - informal -> teams
    - formal -> email
    - urgent -> teams
    - (other) -> email

    **Internal (off hours):**
    - urgent -> imessage
    - (other) -> queued (deferred until work hours)

    **External:**
    - (always) -> email

    Args:
        recipient_type: One of "self", "internal", "external".
        urgency: Message urgency/style: "urgent", "informational", "ephemeral",
                 "informal", "formal".
        work_hours: Whether it is currently work hours. If None, auto-detected
                    via ``is_work_hours()``.

    Returns:
        Channel name string: "imessage", "email", "notification", "teams"
parse_local_date_to_epoch function · python · L29-L35 (7 LOC)
chief/imessage_daemon.py
def parse_local_date_to_epoch(date_local: str) -> int:
    """Parse `YYYY-MM-DD HH:MM:SS` in local tz to epoch seconds."""
    dt = datetime.strptime(date_local, "%Y-%m-%d %H:%M:%S")
    local_tz = datetime.now().astimezone().tzinfo
    if local_tz is None:
        raise ValueError("Could not resolve local timezone")
    return int(dt.replace(tzinfo=local_tz).timestamp())
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
compute_lookback_minutes function · python · L38-L52 (15 LOC)
chief/imessage_daemon.py
def compute_lookback_minutes(
    watermark_epoch: int,
    now_epoch: int,
    bootstrap_minutes: int,
    max_minutes: int,
) -> int:
    if watermark_epoch <= 0:
        return bootstrap_minutes
    age_seconds = max(0, now_epoch - watermark_epoch)
    minutes = int(age_seconds / 60) + 2
    if minutes < 1:
        minutes = 1
    if minutes > max_minutes:
        minutes = max_minutes
    return minutes
compute_dispatch_lookback_minutes function · python · L55-L68 (14 LOC)
chief/imessage_daemon.py
def compute_dispatch_lookback_minutes(
    oldest_queued_epoch: int,
    now_epoch: int,
    max_minutes: int,
) -> int:
    if oldest_queued_epoch <= 0:
        return 5
    age_seconds = max(0, now_epoch - oldest_queued_epoch)
    minutes = int(age_seconds / 60) + 3
    if minutes < 1:
        minutes = 1
    if minutes > max_minutes:
        minutes = max_minutes
    return minutes
StateStore.__init__ method · python · L95-L100 (6 LOC)
chief/imessage_daemon.py
    def __init__(self, db_path: Path):
        self.db_path = db_path
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row
        self._init_schema()
StateStore._init_schema method · python · L105-L145 (41 LOC)
chief/imessage_daemon.py
    def _init_schema(self) -> None:
        self.conn.executescript(
            """
            CREATE TABLE IF NOT EXISTS message_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                guid TEXT NOT NULL UNIQUE,
                text TEXT NOT NULL,
                date_local TEXT NOT NULL,
                timestamp_epoch INTEGER NOT NULL,
                raw_json TEXT NOT NULL,
                created_at_utc TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS processing_jobs (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                message_guid TEXT NOT NULL UNIQUE REFERENCES message_events(guid),
                status TEXT NOT NULL CHECK(status IN ('queued', 'running', 'succeeded', 'failed')),
                attempts INTEGER NOT NULL DEFAULT 0,
                last_error TEXT,
                created_at_utc TEXT NOT NULL,
                updated_at_utc TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS outboun
StateStore.get_watermark_epoch method · python · L147-L156 (10 LOC)
chief/imessage_daemon.py
    def get_watermark_epoch(self) -> int:
        row = self.conn.execute(
            "SELECT value FROM watermarks WHERE key = 'last_message_epoch'"
        ).fetchone()
        if not row:
            return 0
        try:
            return int(row["value"])
        except (TypeError, ValueError):
            return 0
StateStore.set_watermark_epoch method · python · L158-L169 (12 LOC)
chief/imessage_daemon.py
    def set_watermark_epoch(self, epoch: int) -> None:
        self.conn.execute(
            """
            INSERT INTO watermarks(key, value, updated_at_utc)
            VALUES('last_message_epoch', ?, ?)
            ON CONFLICT(key) DO UPDATE SET
                value = excluded.value,
                updated_at_utc = excluded.updated_at_utc
            """,
            (str(epoch), utc_now_iso()),
        )
        self.conn.commit()
StateStore.ingest_messages method · python · L171-L197 (27 LOC)
chief/imessage_daemon.py
    def ingest_messages(self, messages: list[IngestedMessage]) -> tuple[int, int]:
        inserted = 0
        max_epoch = 0
        now = utc_now_iso()
        for msg in messages:
            cur = self.conn.execute(
                """
                INSERT OR IGNORE INTO message_events
                (guid, text, date_local, timestamp_epoch, raw_json, created_at_utc)
                VALUES (?, ?, ?, ?, ?, ?)
                """,
                (msg.guid, msg.text, msg.date_local, msg.timestamp_epoch, msg.raw_json, now),
            )
            if cur.rowcount == 1:
                inserted += 1
                self.conn.execute(
                    """
                    INSERT OR IGNORE INTO processing_jobs
                    (message_guid, status, attempts, last_error, created_at_utc, updated_at_utc)
                    VALUES (?, 'queued', 0, NULL, ?, ?)
                    """,
                    (msg.guid, now, now),
                )
            if msg.timestamp_epoc
StateStore.list_queued_jobs method · python · L199-L218 (20 LOC)
chief/imessage_daemon.py
    def list_queued_jobs(self, limit: int) -> list[dict[str, Any]]:
        rows = self.conn.execute(
            """
            SELECT
                j.id,
                j.message_guid,
                j.status,
                j.attempts,
                e.timestamp_epoch,
                e.date_local,
                e.text
            FROM processing_jobs j
            JOIN message_events e ON e.guid = j.message_guid
            WHERE j.status = 'queued'
            ORDER BY e.timestamp_epoch ASC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()
        return [dict(row) for row in rows]
Repobility · open methodology · https://repobility.com/research/
StateStore.mark_jobs_running method · python · L220-L237 (18 LOC)
chief/imessage_daemon.py
    def mark_jobs_running(self, job_ids: list[int]) -> None:
        if not job_ids:
            return
        now = utc_now_iso()
        placeholders = ",".join("?" for _ in job_ids)
        params: list[Any] = [now]
        params.extend(job_ids)
        self.conn.execute(
            f"""
            UPDATE processing_jobs
            SET status = 'running',
                attempts = attempts + 1,
                updated_at_utc = ?
            WHERE id IN ({placeholders})
            """,
            tuple(params),
        )
        self.conn.commit()
StateStore.mark_job_result method · python · L239-L251 (13 LOC)
chief/imessage_daemon.py
    def mark_job_result(self, message_guid: str, success: bool, error: str = "") -> None:
        status = "succeeded" if success else "failed"
        now = utc_now_iso()
        err_val: Any = None if success else error[:1000]
        self.conn.execute(
            """
            UPDATE processing_jobs
            SET status = ?, last_error = ?, updated_at_utc = ?
            WHERE message_guid = ?
            """,
            (status, err_val, now, message_guid),
        )
        self.conn.commit()
StateStore.count_jobs_by_status method · python · L253-L258 (6 LOC)
chief/imessage_daemon.py
    def count_jobs_by_status(self, status: str) -> int:
        row = self.conn.execute(
            "SELECT COUNT(*) AS c FROM processing_jobs WHERE status = ?",
            (status,),
        ).fetchone()
        return int(row["c"]) if row else 0
IMessageDaemon.run_forever method · python · L275-L294 (20 LOC)
chief/imessage_daemon.py
    def run_forever(self) -> None:
        logger.info(
            "Starting iMessage daemon (poll=%ss, db=%s)",
            self.config.poll_interval_seconds,
            self.config.state_db_path,
        )
        while True:
            try:
                result = self.run_once()
                if result["ingested"] > 0 or result["dispatched"] > 0:
                    logger.info(
                        "Cycle complete: ingested=%d dispatched=%d queued=%d failed=%d",
                        result["ingested"],
                        result["dispatched"],
                        self.store.count_jobs_by_status("queued"),
                        self.store.count_jobs_by_status("failed"),
                    )
            except Exception:
                logger.exception("Daemon cycle failed")
            time.sleep(self.config.poll_interval_seconds)
IMessageDaemon._ingest_cycle method · python · L296-L341 (46 LOC)
chief/imessage_daemon.py
    def _ingest_cycle(self) -> int:
        now_epoch = int(time.time())
        watermark = self.store.get_watermark_epoch()
        lookback = compute_lookback_minutes(
            watermark_epoch=watermark,
            now_epoch=now_epoch,
            bootstrap_minutes=self.config.bootstrap_lookback_minutes,
            max_minutes=self.config.max_lookback_minutes,
        )

        cmd = [str(self.config.reader_path), "--minutes", str(lookback)]
        proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
        if proc.returncode != 0:
            stderr = proc.stderr.strip()
            logger.error("imessage-reader failed (code=%d): %s", proc.returncode, stderr)
            return 0

        raw = proc.stdout.strip() or "[]"
        parsed = json.loads(raw)
        if not isinstance(parsed, list):
            raise ValueError("imessage-reader output was not a JSON array")

        messages: list[IngestedMessage] = []
        for row in parsed:
            if
IMessageDaemon._dispatch_cycle method · python · L343-L379 (37 LOC)
chief/imessage_daemon.py
    def _dispatch_cycle(self) -> int:
        queued = self.store.list_queued_jobs(limit=self.config.dispatch_batch_size)
        if not queued:
            return 0

        job_ids = [int(row["id"]) for row in queued]
        guids = [str(row["message_guid"]) for row in queued]
        oldest_epoch = min(int(row["timestamp_epoch"]) for row in queued)
        lookback = compute_dispatch_lookback_minutes(
            oldest_queued_epoch=oldest_epoch,
            now_epoch=int(time.time()),
            max_minutes=self.config.max_lookback_minutes,
        )

        self.store.mark_jobs_running(job_ids)
        cmd = [str(self.config.inbox_monitor_path), "--interval", str(lookback)]
        proc = subprocess.run(cmd, capture_output=True, text=True, check=False)
        if proc.returncode != 0:
            err = (proc.stderr or proc.stdout or "inbox-monitor failed").strip()
            for guid in guids:
                self.store.mark_job_result(guid, success=False, error=err)
         
IMessageDaemon._load_processed_ids method · python · L381-L392 (12 LOC)
chief/imessage_daemon.py
    def _load_processed_ids(self) -> set[str]:
        if not self.config.processed_file.exists():
            return set()
        try:
            payload = json.loads(self.config.processed_file.read_text(encoding="utf-8"))
        except json.JSONDecodeError:
            logger.warning("Processed file is not valid JSON: %s", self.config.processed_file)
            return set()
        values = payload.get("processed_ids", [])
        if not isinstance(values, list):
            return set()
        return {str(v) for v in values}
UnifiedCalendarService.__init__ method · python · L14-L24 (11 LOC)
connectors/calendar_unified.py
    def __init__(
        self,
        router: ProviderRouter,
        ownership_db_path: Path,
        require_all_read_providers_success: bool = True,
    ):
        self.router = router
        self.ownership_db_path = Path(ownership_db_path)
        self.require_all_read_providers_success = bool(require_all_read_providers_success)
        self.ownership_db_path.parent.mkdir(parents=True, exist_ok=True)
        self._init_ownership_db()
Powered by Repobility — scan your code at https://repobility.com
UnifiedCalendarService._init_ownership_db method · python · L35-L50 (16 LOC)
connectors/calendar_unified.py
    def _init_ownership_db(self) -> None:
        with self._open_ownership_db() as conn:
            conn.executescript(
                """
                CREATE TABLE IF NOT EXISTS event_ownership (
                    unified_uid TEXT PRIMARY KEY,
                    provider TEXT NOT NULL,
                    native_id TEXT NOT NULL,
                    calendar_name TEXT,
                    updated_at_utc TEXT NOT NULL
                );
                CREATE INDEX IF NOT EXISTS idx_event_ownership_native
                    ON event_ownership(native_id, updated_at_utc DESC);
                """
            )
            conn.commit()
UnifiedCalendarService._upsert_ownership method · python · L52-L72 (21 LOC)
connectors/calendar_unified.py
    def _upsert_ownership(self, event: dict) -> None:
        unified_uid = str(event.get("unified_uid", "")).strip()
        provider = normalize_provider_name(str(event.get("provider", "")))
        native_id = str(event.get("native_id", "")).strip()
        calendar_name = str(event.get("calendar", "") or event.get("calendar_id", "")).strip()
        if not unified_uid or not provider or not native_id:
            return
        with self._open_ownership_db() as conn:
            conn.execute(
                """
                INSERT INTO event_ownership(unified_uid, provider, native_id, calendar_name, updated_at_utc)
                VALUES(?, ?, ?, ?, ?)
                ON CONFLICT(unified_uid) DO UPDATE SET
                    provider=excluded.provider,
                    native_id=excluded.native_id,
                    calendar_name=excluded.calendar_name,
                    updated_at_utc=excluded.updated_at_utc
                """,
                (unified_uid, provider, 
UnifiedCalendarService._delete_ownership method · python · L74-L79 (6 LOC)
connectors/calendar_unified.py
    def _delete_ownership(self, unified_uid: str) -> None:
        if not unified_uid:
            return
        with self._open_ownership_db() as conn:
            conn.execute("DELETE FROM event_ownership WHERE unified_uid = ?", (unified_uid,))
            conn.commit()
UnifiedCalendarService._lookup_ownership method · python · L81-L105 (25 LOC)
connectors/calendar_unified.py
    def _lookup_ownership(self, event_uid: str) -> tuple[str, str] | None:
        event_uid = (event_uid or "").strip()
        if not event_uid:
            return None
        with self._open_ownership_db() as conn:
            row = conn.execute(
                "SELECT provider, native_id FROM event_ownership WHERE unified_uid = ?",
                (event_uid,),
            ).fetchone()
            if row:
                return str(row["provider"]), str(row["native_id"])

            row = conn.execute(
                """
                SELECT provider, native_id
                FROM event_ownership
                WHERE native_id = ?
                ORDER BY updated_at_utc DESC
                LIMIT 1
                """,
                (event_uid,),
            ).fetchone()
            if row:
                return str(row["provider"]), str(row["native_id"])
        return None
UnifiedCalendarService._tag_event method · python · L107-L120 (14 LOC)
connectors/calendar_unified.py
    def _tag_event(self, event: dict, provider_name: str) -> dict:
        tagged = dict(event)
        provider = normalize_provider_name(provider_name) or provider_name
        native_id = str(tagged.get("native_id", "") or tagged.get("uid", "")).strip()
        if native_id:
            tagged["native_id"] = native_id
        tagged["provider"] = provider
        if not tagged.get("calendar_id"):
            tagged["calendar_id"] = str(tagged.get("calendar", "") or "")
        if not tagged.get("source_account"):
            tagged["source_account"] = str(tagged.get("source", "") or "")
        if not tagged.get("unified_uid"):
            tagged["unified_uid"] = f"{provider}:{native_id}" if native_id else ""
        return tagged
UnifiedCalendarService._is_error_payload method · python · L123-L128 (6 LOC)
connectors/calendar_unified.py
    def _is_error_payload(payload: object) -> bool:
        if isinstance(payload, dict):
            return bool(payload.get("error"))
        if isinstance(payload, list) and payload and isinstance(payload[0], dict):
            return bool(payload[0].get("error"))
        return False
UnifiedCalendarService._build_dual_read_error method · python · L131-L140 (10 LOC)
connectors/calendar_unified.py
    def _build_dual_read_error(required: list[str], succeeded: list[str], rows: list[dict], errors: list[dict]) -> dict:
        failed = [p for p in required if p not in succeeded]
        return {
            "error": "Dual-read policy requires all connected providers to succeed",
            "providers_required": required,
            "providers_succeeded": succeeded,
            "providers_failed": failed,
            "partial_results": rows,
            "provider_errors": errors,
        }
UnifiedCalendarService._filter_source method · python · L142-L158 (17 LOC)
connectors/calendar_unified.py
    def _filter_source(self, rows: list[dict], source_filter: str) -> list[dict]:
        needle = (source_filter or "").strip().lower()
        if not needle:
            return rows
        filtered: list[dict] = []
        for row in rows:
            haystack = " ".join(
                [
                    str(row.get("provider", "") or ""),
                    str(row.get("source_account", "") or ""),
                    str(row.get("calendar", "") or ""),
                    str(row.get("calendar_id", "") or ""),
                ]
            ).lower()
            if needle in haystack:
                filtered.append(row)
        return filtered
All rows scored by the Repobility analyzer (https://repobility.com)
UnifiedCalendarService._event_dedupe_key method · python · L161-L168 (8 LOC)
connectors/calendar_unified.py
    def _event_dedupe_key(event: dict) -> tuple:
        ical_uid = str(event.get("ical_uid", "")).strip()
        if ical_uid:
            return ("ical_uid", ical_uid.lower())
        title = str(event.get("title", "")).strip().lower()
        start = str(event.get("start", "")).strip()
        end = str(event.get("end", "")).strip()
        return ("fallback", title, start, end)
UnifiedCalendarService._dedupe_events method · python · L170-L179 (10 LOC)
connectors/calendar_unified.py
    def _dedupe_events(self, rows: list[dict]) -> list[dict]:
        seen: dict[tuple, dict] = {}
        for row in rows:
            key = self._event_dedupe_key(row)
            if key in seen:
                continue
            seen[key] = row
        deduped = list(seen.values())
        deduped.sort(key=lambda r: str(r.get("start", "")))
        return deduped
UnifiedCalendarService._provider_from_prefixed_uid method · python · L182-L190 (9 LOC)
connectors/calendar_unified.py
    def _provider_from_prefixed_uid(event_uid: str) -> tuple[str, str] | None:
        uid = (event_uid or "").strip()
        if ":" not in uid:
            return None
        provider_token, native_id = uid.split(":", 1)
        provider = normalize_provider_name(provider_token)
        if not provider or not native_id:
            return None
        return provider, native_id
‹ prevpage 3 / 11next ›