← back to jrichyrich__chief-of-staff

Function bodies 522 total

All specs Real LLM only Function bodies
UnifiedCalendarService.list_calendars method · python · L192-L226 (35 LOC)
connectors/calendar_unified.py
    def list_calendars(
        self,
        provider_preference: str = "auto",
        source_filter: str = "",
    ) -> list[dict]:
        decision = self.router.decide_read(provider_preference=provider_preference)
        if not decision.providers:
            return [{"error": "No connected calendar providers available"}]

        rows: list[dict] = []
        errors: list[dict] = []
        succeeded: set[str] = set()
        for provider_name in decision.providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            payload = provider.list_calendars()
            if self._is_error_payload(payload):
                errors.extend(payload if isinstance(payload, list) else [payload])
                continue
            succeeded.add(provider_name)
            rows.extend(payload if isinstance(payload, list) else [payload])

        rows = self._filter_source(rows, source_filter=source_filter)
        
UnifiedCalendarService.get_events method · python · L228-L271 (44 LOC)
connectors/calendar_unified.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
        provider_preference: str = "auto",
        source_filter: str = "",
    ) -> list[dict]:
        decision = self.router.decide_read(provider_preference=provider_preference)
        if not decision.providers:
            return [{"error": "No connected calendar providers available"}]

        rows: list[dict] = []
        errors: list[dict] = []
        succeeded: set[str] = set()
        for provider_name in decision.providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            payload = provider.get_events(start_dt, end_dt, calendar_names=calendar_names)
            if self._is_error_payload(payload):
                errors.extend(payload if isinstance(payload, list) else [payload])
                continue
            succeeded.add(provider_name)
           
UnifiedCalendarService.search_events method · python · L273-L316 (44 LOC)
connectors/calendar_unified.py
    def search_events(
        self,
        query: str,
        start_dt: datetime,
        end_dt: datetime,
        provider_preference: str = "auto",
        source_filter: str = "",
    ) -> list[dict]:
        decision = self.router.decide_read(provider_preference=provider_preference)
        if not decision.providers:
            return [{"error": "No connected calendar providers available"}]

        rows: list[dict] = []
        errors: list[dict] = []
        succeeded: set[str] = set()
        for provider_name in decision.providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            payload = provider.search_events(query, start_dt, end_dt)
            if self._is_error_payload(payload):
                errors.extend(payload if isinstance(payload, list) else [payload])
                continue
            succeeded.add(provider_name)
            provider_rows = payload if isinstance(payload, li
UnifiedCalendarService._resolve_write_provider method · python · L318-L362 (45 LOC)
connectors/calendar_unified.py
    def _resolve_write_provider(
        self,
        event_uid: str = "",
        target_provider: str = "",
        calendar_name: str = "",
        provider_preference: str = "auto",
    ) -> tuple[list[str], str, str, str]:
        if target_provider:
            decision = self.router.decide_write(
                target_provider=target_provider,
                calendar_name=calendar_name,
                provider_preference=provider_preference,
            )
            native_id = event_uid
            parsed = self._provider_from_prefixed_uid(event_uid)
            if parsed:
                _, native_id = parsed
            return decision.providers, decision.preferred_provider, decision.fallback_provider, native_id

        parsed = self._provider_from_prefixed_uid(event_uid)
        if parsed:
            provider, native_id = parsed
            decision = self.router.decide_write(
                target_provider=provider,
                calendar_name=calendar_name,
     
UnifiedCalendarService.create_event method · python · L364-L406 (43 LOC)
connectors/calendar_unified.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
        target_provider: str = "",
        provider_preference: str = "auto",
    ) -> dict:
        providers, preferred, _, _ = self._resolve_write_provider(
            target_provider=target_provider,
            calendar_name=calendar_name or "",
            provider_preference=provider_preference,
        )
        if not providers:
            return {"error": "No connected calendar providers available for write"}

        errors: list[str] = []
        for provider_name in providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            result = provider.create_event(
                title=title,
                start_dt=start_dt,
        
UnifiedCalendarService.update_event method · python · L408-L440 (33 LOC)
connectors/calendar_unified.py
    def update_event(
        self,
        event_uid: str,
        calendar_name: Optional[str] = None,
        target_provider: str = "",
        provider_preference: str = "auto",
        **kwargs,
    ) -> dict:
        providers, preferred, _, native_id = self._resolve_write_provider(
            event_uid=event_uid,
            target_provider=target_provider,
            calendar_name=calendar_name or "",
            provider_preference=provider_preference,
        )
        if not providers:
            return {"error": "No connected calendar providers available for write"}
        native_id = native_id or event_uid

        errors: list[str] = []
        for provider_name in providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            result = provider.update_event(native_id, calendar_name=calendar_name, **kwargs)
            if result.get("error"):
                errors.append(f"{provider_name
UnifiedCalendarService.delete_event method · python · L442-L476 (35 LOC)
connectors/calendar_unified.py
    def delete_event(
        self,
        event_uid: str,
        calendar_name: Optional[str] = None,
        target_provider: str = "",
        provider_preference: str = "auto",
    ) -> dict:
        providers, preferred, _, native_id = self._resolve_write_provider(
            event_uid=event_uid,
            target_provider=target_provider,
            calendar_name=calendar_name or "",
            provider_preference=provider_preference,
        )
        if not providers:
            return {"error": "No connected calendar providers available for write"}
        native_id = native_id or event_uid

        errors: list[str] = []
        for provider_name in providers:
            provider = self.router.get_provider(provider_name)
            if provider is None:
                continue
            result = provider.delete_event(native_id, calendar_name=calendar_name)
            if result.get("error"):
                errors.append(f"{provider_name}: {result['error']}")
     
Repobility · MCP-ready · https://repobility.com
ClaudeM365Bridge._sanitize_for_prompt method · python · L14-L23 (10 LOC)
connectors/claude_m365_bridge.py
    def _sanitize_for_prompt(text: str, max_length: int = 500) -> str:
        """Sanitize user input before embedding in a Claude prompt."""
        if text is None:
            return ""
        # Strip control characters (keep only printable + space)
        sanitized = "".join(c for c in str(text) if c.isprintable() or c == " ")
        # Truncate
        if len(sanitized) > max_length:
            sanitized = sanitized[:max_length] + "..."
        return sanitized
ClaudeM365Bridge.__init__ method · python · L25-L39 (15 LOC)
connectors/claude_m365_bridge.py
    def __init__(
        self,
        claude_bin: str = "claude",
        mcp_config: str = "",
        model: str = "sonnet",
        timeout_seconds: int = 90,
        detect_timeout_seconds: int = 5,
        runner: Callable[..., subprocess.CompletedProcess] | None = None,
    ):
        self.claude_bin = claude_bin
        self.mcp_config = mcp_config
        self.model = model
        self.timeout_seconds = timeout_seconds
        self.detect_timeout_seconds = detect_timeout_seconds
        self._runner = runner or subprocess.run
ClaudeM365Bridge.is_connector_connected method · python · L41-L49 (9 LOC)
connectors/claude_m365_bridge.py
    def is_connector_connected(self) -> bool:
        args = [self.claude_bin, "mcp", "list"]
        if self.mcp_config:
            args.extend(["--mcp-config", self.mcp_config])
        proc = self._run(args, timeout=self.detect_timeout_seconds)
        if proc is None or proc.returncode != 0:
            return False
        output = f"{proc.stdout or ''}\n{proc.stderr or ''}"
        return bool(re.search(r"microsoft\s*365:.*\bconnected\b", output, flags=re.IGNORECASE))
ClaudeM365Bridge.list_calendars method · python · L51-L65 (15 LOC)
connectors/claude_m365_bridge.py
    def list_calendars(self) -> list[dict]:
        schema = {
            "type": "object",
            "properties": {"results": {"type": "array", "items": {"type": "object"}}},
            "required": ["results"],
        }
        prompt = (
            "Use only Microsoft 365 MCP connector tools to list Outlook/Exchange calendars. "
            "Return calendar rows with useful fields such as name, calendar_id, source_account, type, and color "
            "when available."
        )
        data = self._invoke_structured(prompt, schema)
        if data.get("error"):
            return [data]
        return [dict(row) for row in data.get("results", []) if isinstance(row, dict)]
ClaudeM365Bridge.get_events method · python · L67-L93 (27 LOC)
connectors/claude_m365_bridge.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
    ) -> list[dict]:
        schema = {
            "type": "object",
            "properties": {"results": {"type": "array", "items": {"type": "object"}}},
            "required": ["results"],
        }
        filter_clause = (
            "Limit to these calendars when possible: "
            f"<user_calendar_names>{', '.join(self._sanitize_for_prompt(n) for n in calendar_names)}</user_calendar_names>. "
            if calendar_names
            else ""
        )
        prompt = (
            "Use only Microsoft 365 MCP connector tools to get calendar events. "
            f"Time range start={start_dt.isoformat()} end={end_dt.isoformat()}. "
            f"{filter_clause}"
            "Return results with fields like uid/native_id, title, start, end, calendar/calendar_id, source_account."
        )
        data = self._invoke_structured(prompt, 
ClaudeM365Bridge.search_events method · python · L95-L110 (16 LOC)
connectors/claude_m365_bridge.py
    def search_events(self, query: str, start_dt: datetime, end_dt: datetime) -> list[dict]:
        schema = {
            "type": "object",
            "properties": {"results": {"type": "array", "items": {"type": "object"}}},
            "required": ["results"],
        }
        prompt = (
            "Use only Microsoft 365 MCP connector tools to search Outlook/Exchange calendar events by title text. "
            f"query=<user_query>{self._sanitize_for_prompt(query)}</user_query>, "
            f"start={start_dt.isoformat()}, end={end_dt.isoformat()}. "
            "Return results with fields like uid/native_id, title, start, end, calendar/calendar_id, source_account."
        )
        data = self._invoke_structured(prompt, schema)
        if data.get("error"):
            return [data]
        return [dict(row) for row in data.get("results", []) if isinstance(row, dict)]
ClaudeM365Bridge.create_event method · python · L112-L141 (30 LOC)
connectors/claude_m365_bridge.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
    ) -> dict:
        schema = {
            "type": "object",
            "properties": {"result": {"type": "object"}},
            "required": ["result"],
        }
        prompt = (
            "Use only Microsoft 365 MCP connector tools to create a calendar event in Outlook/Exchange. "
            f"title=<user_title>{self._sanitize_for_prompt(title)}</user_title>, "
            f"start={start_dt.isoformat()}, end={end_dt.isoformat()}, "
            f"calendar_name=<user_calendar_name>{self._sanitize_for_prompt(calendar_name)}</user_calendar_name>, "
            f"location=<user_location>{self._sanitize_for_prompt(location)}</user_location>, "
            f"notes=<user_notes>{self._sanitize_for_prompt(notes, max_length
ClaudeM365Bridge.update_event method · python · L143-L169 (27 LOC)
connectors/claude_m365_bridge.py
    def update_event(self, event_uid: str, calendar_name: Optional[str] = None, **kwargs) -> dict:
        schema = {
            "type": "object",
            "properties": {"result": {"type": "object"}},
            "required": ["result"],
        }
        safe_kwargs = dict(kwargs)
        if isinstance(safe_kwargs.get("start_dt"), datetime):
            safe_kwargs["start_dt"] = safe_kwargs["start_dt"].isoformat()
        if isinstance(safe_kwargs.get("end_dt"), datetime):
            safe_kwargs["end_dt"] = safe_kwargs["end_dt"].isoformat()
        # Sanitize user-supplied string values in kwargs
        sanitized_kwargs = {}
        for k, v in safe_kwargs.items():
            sanitized_kwargs[k] = self._sanitize_for_prompt(v) if isinstance(v, str) else v
        prompt = (
            "Use only Microsoft 365 MCP connector tools to update an existing Outlook/Exchange calendar event. "
            f"event_uid=<user_event_uid>{self._sanitize_for_prompt(event_uid)}</user_event_uid>
Repobility · code-quality intelligence · https://repobility.com
ClaudeM365Bridge.delete_event method · python · L171-L190 (20 LOC)
connectors/claude_m365_bridge.py
    def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
        schema = {
            "type": "object",
            "properties": {
                "status": {"type": "string"},
                "event_uid": {"type": "string"},
                "error": {"type": ["string", "null"]},
            },
            "required": ["status"],
        }
        prompt = (
            "Use only Microsoft 365 MCP connector tools to delete an Outlook/Exchange calendar event. "
            f"event_uid=<user_event_uid>{self._sanitize_for_prompt(event_uid)}</user_event_uid>, "
            f"calendar_name=<user_calendar_name>{self._sanitize_for_prompt(calendar_name)}</user_calendar_name>. "
            "Return status and event_uid."
        )
        data = self._invoke_structured(prompt, schema)
        if data.get("error"):
            return data
        return dict(data)
ClaudeM365Bridge._invoke_structured method · python · L192-L240 (49 LOC)
connectors/claude_m365_bridge.py
    def _invoke_structured(self, prompt: str, schema: dict) -> dict:
        args = [
            self.claude_bin,
            "-p",
            prompt,
            "--output-format",
            "json",
            "--json-schema",
            json.dumps(schema),
            "--no-session-persistence",
            "--disable-slash-commands",
            "--model",
            self.model,
        ]
        if self.mcp_config:
            args.extend(["--mcp-config", self.mcp_config])

        proc = self._run(args, timeout=self.timeout_seconds)
        if proc is None:
            return {"error": "Failed to invoke Claude CLI"}
        if proc.returncode != 0:
            err = (proc.stderr or proc.stdout or "").strip()
            return {"error": f"Claude bridge command failed: {err or 'unknown error'}"}

        payload = self._parse_output_json(proc.stdout or "")
        if payload is None:
            return {"error": "Claude bridge returned invalid JSON"}

        structured = pa
ClaudeM365Bridge._run method · python · L242-L254 (13 LOC)
connectors/claude_m365_bridge.py
    def _run(self, args: list[str], timeout: int) -> subprocess.CompletedProcess | None:
        try:
            return self._runner(
                args,
                capture_output=True,
                text=True,
                timeout=timeout,
                check=False,
            )
        except FileNotFoundError:
            return None
        except Exception:
            return None
ClaudeM365Bridge._parse_output_json method · python · L257-L265 (9 LOC)
connectors/claude_m365_bridge.py
    def _parse_output_json(text: str) -> dict | None:
        text = (text or "").strip()
        if not text:
            return None
        try:
            data = json.loads(text)
            return data if isinstance(data, dict) else None
        except json.JSONDecodeError:
            return None
ClaudeM365Bridge._parse_first_json_object method · python · L268-L303 (36 LOC)
connectors/claude_m365_bridge.py
    def _parse_first_json_object(text: str) -> dict | None:
        s = text or ""
        n = len(s)
        i = 0
        while i < n:
            if s[i] != "{":
                i += 1
                continue
            start = i
            depth = 0
            in_string = False
            escape = False
            j = i
            while j < n:
                c = s[j]
                if escape:
                    escape = False
                elif c == "\\" and in_string:
                    escape = True
                elif c == '"':
                    in_string = not in_string
                elif not in_string:
                    if c == "{":
                        depth += 1
                    elif c == "}":
                        depth -= 1
                        if depth == 0:
                            candidate = s[start:j + 1]
                            try:
                                parsed = json.loads(candidate)
                                ret
CalendarProvider.get_events method · python · L22-L28 (7 LOC)
connectors/provider_base.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
    ) -> list[dict]:
        """Get events within a date range."""
CalendarProvider.create_event method · python · L31-L41 (11 LOC)
connectors/provider_base.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
    ) -> dict:
        """Create an event."""
CalendarProvider.update_event method · python · L44-L50 (7 LOC)
connectors/provider_base.py
    def update_event(
        self,
        event_uid: str,
        calendar_name: Optional[str] = None,
        **kwargs,
    ) -> dict:
        """Update an existing event."""
Repobility (the analyzer behind this table) · https://repobility.com
CalendarProvider.search_events method · python · L57-L63 (7 LOC)
connectors/provider_base.py
    def search_events(
        self,
        query: str,
        start_dt: datetime,
        end_dt: datetime,
    ) -> list[dict]:
        """Search events in a date range."""
AppleCalendarProvider.list_calendars method · python · L22-L31 (10 LOC)
connectors/providers/apple_provider.py
    def list_calendars(self) -> list[dict]:
        rows = self.store.list_calendars()
        if self._contains_error(rows):
            return rows
        self._calendar_source_map = {
            str(row.get("name", "")): str(row.get("source", "") or "")
            for row in rows
            if isinstance(row, dict)
        }
        return [self._tag_calendar(dict(row)) for row in rows if isinstance(row, dict)]
AppleCalendarProvider.get_events method · python · L33-L42 (10 LOC)
connectors/providers/apple_provider.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
    ) -> list[dict]:
        rows = self.store.get_events(start_dt, end_dt, calendar_names=calendar_names)
        if self._contains_error(rows):
            return rows
        return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]
AppleCalendarProvider.create_event method · python · L44-L65 (22 LOC)
connectors/providers/apple_provider.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
    ) -> dict:
        result = self.store.create_event(
            title=title,
            start_dt=start_dt,
            end_dt=end_dt,
            calendar_name=calendar_name,
            location=location,
            notes=notes,
            is_all_day=is_all_day,
        )
        if result.get("error"):
            return result
        return self._tag_event(dict(result))
AppleCalendarProvider.update_event method · python · L67-L76 (10 LOC)
connectors/providers/apple_provider.py
    def update_event(
        self,
        event_uid: str,
        calendar_name: Optional[str] = None,
        **kwargs,
    ) -> dict:
        result = self.store.update_event(event_uid, calendar_name=calendar_name, **kwargs)
        if result.get("error"):
            return result
        return self._tag_event(dict(result))
AppleCalendarProvider.delete_event method · python · L78-L86 (9 LOC)
connectors/providers/apple_provider.py
    def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
        result = self.store.delete_event(event_uid, calendar_name=calendar_name)
        if result.get("error"):
            return result
        tagged = dict(result)
        tagged["provider"] = self.provider_name
        tagged["native_id"] = event_uid
        tagged["unified_uid"] = f"{self.provider_name}:{event_uid}"
        return tagged
AppleCalendarProvider.search_events method · python · L88-L97 (10 LOC)
connectors/providers/apple_provider.py
    def search_events(
        self,
        query: str,
        start_dt: datetime,
        end_dt: datetime,
    ) -> list[dict]:
        rows = self.store.search_events(query, start_dt, end_dt)
        if self._contains_error(rows):
            return rows
        return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]
AppleCalendarProvider._tag_calendar method · python · L103-L108 (6 LOC)
connectors/providers/apple_provider.py
    def _tag_calendar(self, row: dict) -> dict:
        tagged = dict(row)
        tagged["provider"] = self.provider_name
        tagged["source_account"] = str(row.get("source", "") or "")
        tagged["calendar_id"] = str(row.get("name", "") or "")
        return tagged
About: code-quality intelligence by Repobility · https://repobility.com
AppleCalendarProvider._tag_event method · python · L110-L125 (16 LOC)
connectors/providers/apple_provider.py
    def _tag_event(self, row: dict) -> dict:
        tagged = dict(row)
        calendar_name = str(tagged.get("calendar", "") or "")
        source_account = self._calendar_source_map.get(calendar_name, "")
        if not source_account:
            # Lazy refresh in case events are fetched before list_calendars.
            self.list_calendars()
            source_account = self._calendar_source_map.get(calendar_name, "")

        native_id = str(tagged.get("uid", "") or "")
        tagged["provider"] = self.provider_name
        tagged["source_account"] = source_account
        tagged["calendar_id"] = calendar_name
        tagged["native_id"] = native_id
        tagged["unified_uid"] = f"{self.provider_name}:{native_id}" if native_id else ""
        return tagged
Microsoft365CalendarProvider.__init__ method · python · L19-L37 (19 LOC)
connectors/providers/m365_provider.py
    def __init__(
        self,
        connected: bool = False,
        list_calendars_fn: Callable[[], list[dict]] | None = None,
        get_events_fn: Callable[[datetime, datetime, Optional[list[str]]], list[dict]] | None = None,
        create_event_fn: Callable[..., dict] | None = None,
        update_event_fn: Callable[..., dict] | None = None,
        delete_event_fn: Callable[..., dict] | None = None,
        search_events_fn: Callable[[str, datetime, datetime], list[dict]] | None = None,
    ):
        self._connected = bool(connected)
        self._hooks = {
            "list_calendars": list_calendars_fn,
            "get_events": get_events_fn,
            "create_event": create_event_fn,
            "update_event": update_event_fn,
            "delete_event": delete_event_fn,
            "search_events": search_events_fn,
        }
Microsoft365CalendarProvider.list_calendars method · python · L45-L52 (8 LOC)
connectors/providers/m365_provider.py
    def list_calendars(self) -> list[dict]:
        if not self.is_connected():
            return [self._not_connected_error()]
        hook = self._hooks["list_calendars"]
        if hook is None:
            return [self._not_configured_error("list calendars")]
        rows = hook()
        return [self._tag_calendar(dict(row)) for row in rows if isinstance(row, dict)]
Microsoft365CalendarProvider.get_events method · python · L54-L66 (13 LOC)
connectors/providers/m365_provider.py
    def get_events(
        self,
        start_dt: datetime,
        end_dt: datetime,
        calendar_names: Optional[list[str]] = None,
    ) -> list[dict]:
        if not self.is_connected():
            return [self._not_connected_error()]
        hook = self._hooks["get_events"]
        if hook is None:
            return [self._not_configured_error("get events")]
        rows = hook(start_dt, end_dt, calendar_names)
        return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]
Microsoft365CalendarProvider.create_event method · python · L68-L94 (27 LOC)
connectors/providers/m365_provider.py
    def create_event(
        self,
        title: str,
        start_dt: datetime,
        end_dt: datetime,
        calendar_name: Optional[str] = None,
        location: Optional[str] = None,
        notes: Optional[str] = None,
        is_all_day: bool = False,
    ) -> dict:
        if not self.is_connected():
            return self._not_connected_error()
        hook = self._hooks["create_event"]
        if hook is None:
            return self._not_configured_error("create events")
        row = hook(
            title=title,
            start_dt=start_dt,
            end_dt=end_dt,
            calendar_name=calendar_name,
            location=location,
            notes=notes,
            is_all_day=is_all_day,
        )
        if row.get("error"):
            return row
        return self._tag_event(dict(row))
Microsoft365CalendarProvider.update_event method · python · L96-L110 (15 LOC)
connectors/providers/m365_provider.py
    def update_event(
        self,
        event_uid: str,
        calendar_name: Optional[str] = None,
        **kwargs,
    ) -> dict:
        if not self.is_connected():
            return self._not_connected_error()
        hook = self._hooks["update_event"]
        if hook is None:
            return self._not_configured_error("update events")
        row = hook(event_uid=event_uid, calendar_name=calendar_name, **kwargs)
        if row.get("error"):
            return row
        return self._tag_event(dict(row))
Microsoft365CalendarProvider.delete_event method · python · L112-L125 (14 LOC)
connectors/providers/m365_provider.py
    def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
        if not self.is_connected():
            return self._not_connected_error()
        hook = self._hooks["delete_event"]
        if hook is None:
            return self._not_configured_error("delete events")
        row = hook(event_uid=event_uid, calendar_name=calendar_name)
        if row.get("error"):
            return row
        tagged = dict(row)
        tagged["provider"] = self.provider_name
        tagged["native_id"] = event_uid
        tagged["unified_uid"] = f"{self.provider_name}:{event_uid}"
        return tagged
Microsoft365CalendarProvider.search_events method · python · L127-L139 (13 LOC)
connectors/providers/m365_provider.py
    def search_events(
        self,
        query: str,
        start_dt: datetime,
        end_dt: datetime,
    ) -> list[dict]:
        if not self.is_connected():
            return [self._not_connected_error()]
        hook = self._hooks["search_events"]
        if hook is None:
            return [self._not_configured_error("search events")]
        rows = hook(query, start_dt, end_dt)
        return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]
Repobility · MCP-ready · https://repobility.com
Microsoft365CalendarProvider._not_connected_error method · python · L142-L148 (7 LOC)
connectors/providers/m365_provider.py
    def _not_connected_error() -> dict:
        return {
            "error": (
                "Microsoft 365 connector is not connected for this runtime. "
                "Use provider_preference='auto' or target_provider='apple' to fallback."
            )
        }
Microsoft365CalendarProvider._not_configured_error method · python · L151-L157 (7 LOC)
connectors/providers/m365_provider.py
    def _not_configured_error(operation: str) -> dict:
        return {
            "error": (
                f"Microsoft 365 provider is connected but no adapter hook is configured for '{operation}'. "
                "Wire a provider bridge into Microsoft365CalendarProvider hooks."
            )
        }
Microsoft365CalendarProvider._tag_calendar method · python · L159-L164 (6 LOC)
connectors/providers/m365_provider.py
    def _tag_calendar(self, row: dict) -> dict:
        tagged = dict(row)
        tagged["provider"] = self.provider_name
        tagged["source_account"] = str(row.get("source_account", "") or "Microsoft 365")
        tagged["calendar_id"] = str(row.get("calendar_id", "") or row.get("name", "") or "")
        return tagged
Microsoft365CalendarProvider._tag_event method · python · L166-L174 (9 LOC)
connectors/providers/m365_provider.py
    def _tag_event(self, row: dict) -> dict:
        tagged = dict(row)
        native_id = str(tagged.get("native_id", "") or tagged.get("uid", "") or "")
        tagged["provider"] = self.provider_name
        tagged["source_account"] = str(tagged.get("source_account", "") or "Microsoft 365")
        tagged["calendar_id"] = str(tagged.get("calendar_id", "") or tagged.get("calendar", "") or "")
        tagged["native_id"] = native_id
        tagged["unified_uid"] = f"{self.provider_name}:{native_id}" if native_id else ""
        return tagged
ProviderRouter.decide_read method · python · L49-L88 (40 LOC)
connectors/router.py
    def decide_read(self, provider_preference: str = "auto") -> RouteDecision:
        pref = normalize_provider_name(provider_preference) or (provider_preference or "").strip().lower()
        has_apple = self.is_connected("apple")
        has_m365 = self.is_connected("microsoft_365")

        if pref == "both":
            if has_m365 and has_apple:
                return RouteDecision(["microsoft_365", "apple"], "microsoft_365", "apple", "explicit_both")
            if has_m365:
                return RouteDecision(["microsoft_365"], "microsoft_365", "", "both_requested_m365_only")
            if has_apple:
                return RouteDecision(["apple"], "apple", "", "both_requested_apple_only")
            return RouteDecision([], "", "", "no_provider_connected")

        if pref == "microsoft_365":
            if has_m365:
                fallback = "apple" if has_apple else ""
                providers = ["microsoft_365"] + (["apple"] if fallback else [])
                return R
ProviderRouter.decide_write method · python · L90-L132 (43 LOC)
connectors/router.py
    def decide_write(
        self,
        target_provider: str = "",
        calendar_name: str = "",
        provider_preference: str = "auto",
    ) -> RouteDecision:
        target = normalize_provider_name(target_provider)
        pref = normalize_provider_name(provider_preference)
        has_apple = self.is_connected("apple")
        has_m365 = self.is_connected("microsoft_365")

        if target:
            if self.is_connected(target):
                fallback = "apple" if target == "microsoft_365" and has_apple else ""
                if target == "apple" and has_m365:
                    fallback = "microsoft_365"
                providers = [target] + ([fallback] if fallback else [])
                return RouteDecision(providers, target, fallback, "explicit_target")
            fallback = "apple" if has_apple else ("microsoft_365" if has_m365 else "")
            providers = [fallback] if fallback else []
            reason = f"target_unavailable_fallback_{fallback}" if
ProviderRouter._looks_work_calendar method · python · L135-L140 (6 LOC)
connectors/router.py
    def _looks_work_calendar(calendar_name: str) -> bool:
        name = (calendar_name or "").lower()
        if not name:
            return False
        keywords = ("work", "office", "outlook", "exchange", "corp", "company", "team")
        return any(kw in name for kw in keywords)
chunk_text function · python · L13-L26 (14 LOC)
documents/ingestion.py
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
    words = text.split()
    if len(words) <= chunk_size:
        return [text]

    chunks = []
    start = 0
    while start < len(words):
        end = start + chunk_size
        chunk = " ".join(words[start:end])
        chunks.append(chunk)
        start = end - overlap

    return chunks
Repobility · code-quality intelligence · https://repobility.com
_load_pdf function · python · L29-L43 (15 LOC)
documents/ingestion.py
def _load_pdf(file_path: Path) -> str:
    """Load text from a PDF file."""
    try:
        from pypdf import PdfReader
    except ImportError:
        raise ImportError("pypdf library is required for PDF support. Install with: pip install pypdf")

    reader = PdfReader(file_path)
    text_parts = []
    for page in reader.pages:
        page_text = page.extract_text()
        if page_text.strip():  # Only add non-empty pages
            text_parts.append(page_text)

    return "\n".join(text_parts)
_load_docx function · python · L46-L68 (23 LOC)
documents/ingestion.py
def _load_docx(file_path: Path) -> str:
    """Load text from a DOCX file."""
    try:
        from docx import Document
    except ImportError:
        raise ImportError("python-docx library is required for DOCX support. Install with: pip install python-docx")

    doc = Document(file_path)
    text_parts = []

    # Extract paragraph text
    for paragraph in doc.paragraphs:
        if paragraph.text.strip():
            text_parts.append(paragraph.text)

    # Extract table text
    for table in doc.tables:
        for row in table.rows:
            row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
            if row_text:
                text_parts.append(row_text)

    return "\n".join(text_parts)
load_text_file function · python · L71-L87 (17 LOC)
documents/ingestion.py
def load_text_file(path: Path) -> str:
    """Load text from a file based on its extension."""
    if path.is_symlink():
        raise ValueError(f"Refusing to read symlink: {path}")

    file_size = path.stat().st_size
    if file_size > MAX_FILE_SIZE_BYTES:
        raise ValueError(f"File too large ({file_size} bytes, max {MAX_FILE_SIZE_BYTES}): {path.name}")

    suffix = path.suffix.lower()

    if suffix == ".pdf":
        return _load_pdf(path)
    elif suffix == ".docx":
        return _load_docx(path)
    else:
        return path.read_text(encoding="utf-8")
‹ prevpage 4 / 11next ›