Function bodies 522 total
UnifiedCalendarService.list_calendars method · python · L192-L226 (35 LOC)connectors/calendar_unified.py
def list_calendars(
self,
provider_preference: str = "auto",
source_filter: str = "",
) -> list[dict]:
decision = self.router.decide_read(provider_preference=provider_preference)
if not decision.providers:
return [{"error": "No connected calendar providers available"}]
rows: list[dict] = []
errors: list[dict] = []
succeeded: set[str] = set()
for provider_name in decision.providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
payload = provider.list_calendars()
if self._is_error_payload(payload):
errors.extend(payload if isinstance(payload, list) else [payload])
continue
succeeded.add(provider_name)
rows.extend(payload if isinstance(payload, list) else [payload])
rows = self._filter_source(rows, source_filter=source_filter)
UnifiedCalendarService.get_events method · python · L228-L271 (44 LOC)connectors/calendar_unified.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
provider_preference: str = "auto",
source_filter: str = "",
) -> list[dict]:
decision = self.router.decide_read(provider_preference=provider_preference)
if not decision.providers:
return [{"error": "No connected calendar providers available"}]
rows: list[dict] = []
errors: list[dict] = []
succeeded: set[str] = set()
for provider_name in decision.providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
payload = provider.get_events(start_dt, end_dt, calendar_names=calendar_names)
if self._is_error_payload(payload):
errors.extend(payload if isinstance(payload, list) else [payload])
continue
succeeded.add(provider_name)
UnifiedCalendarService.search_events method · python · L273-L316 (44 LOC)connectors/calendar_unified.py
def search_events(
self,
query: str,
start_dt: datetime,
end_dt: datetime,
provider_preference: str = "auto",
source_filter: str = "",
) -> list[dict]:
decision = self.router.decide_read(provider_preference=provider_preference)
if not decision.providers:
return [{"error": "No connected calendar providers available"}]
rows: list[dict] = []
errors: list[dict] = []
succeeded: set[str] = set()
for provider_name in decision.providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
payload = provider.search_events(query, start_dt, end_dt)
if self._is_error_payload(payload):
errors.extend(payload if isinstance(payload, list) else [payload])
continue
succeeded.add(provider_name)
provider_rows = payload if isinstance(payload, liUnifiedCalendarService._resolve_write_provider method · python · L318-L362 (45 LOC)connectors/calendar_unified.py
def _resolve_write_provider(
self,
event_uid: str = "",
target_provider: str = "",
calendar_name: str = "",
provider_preference: str = "auto",
) -> tuple[list[str], str, str, str]:
if target_provider:
decision = self.router.decide_write(
target_provider=target_provider,
calendar_name=calendar_name,
provider_preference=provider_preference,
)
native_id = event_uid
parsed = self._provider_from_prefixed_uid(event_uid)
if parsed:
_, native_id = parsed
return decision.providers, decision.preferred_provider, decision.fallback_provider, native_id
parsed = self._provider_from_prefixed_uid(event_uid)
if parsed:
provider, native_id = parsed
decision = self.router.decide_write(
target_provider=provider,
calendar_name=calendar_name,
UnifiedCalendarService.create_event method · python · L364-L406 (43 LOC)connectors/calendar_unified.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
target_provider: str = "",
provider_preference: str = "auto",
) -> dict:
providers, preferred, _, _ = self._resolve_write_provider(
target_provider=target_provider,
calendar_name=calendar_name or "",
provider_preference=provider_preference,
)
if not providers:
return {"error": "No connected calendar providers available for write"}
errors: list[str] = []
for provider_name in providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
result = provider.create_event(
title=title,
start_dt=start_dt,
UnifiedCalendarService.update_event method · python · L408-L440 (33 LOC)connectors/calendar_unified.py
def update_event(
self,
event_uid: str,
calendar_name: Optional[str] = None,
target_provider: str = "",
provider_preference: str = "auto",
**kwargs,
) -> dict:
providers, preferred, _, native_id = self._resolve_write_provider(
event_uid=event_uid,
target_provider=target_provider,
calendar_name=calendar_name or "",
provider_preference=provider_preference,
)
if not providers:
return {"error": "No connected calendar providers available for write"}
native_id = native_id or event_uid
errors: list[str] = []
for provider_name in providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
result = provider.update_event(native_id, calendar_name=calendar_name, **kwargs)
if result.get("error"):
errors.append(f"{provider_nameUnifiedCalendarService.delete_event method · python · L442-L476 (35 LOC)connectors/calendar_unified.py
def delete_event(
self,
event_uid: str,
calendar_name: Optional[str] = None,
target_provider: str = "",
provider_preference: str = "auto",
) -> dict:
providers, preferred, _, native_id = self._resolve_write_provider(
event_uid=event_uid,
target_provider=target_provider,
calendar_name=calendar_name or "",
provider_preference=provider_preference,
)
if not providers:
return {"error": "No connected calendar providers available for write"}
native_id = native_id or event_uid
errors: list[str] = []
for provider_name in providers:
provider = self.router.get_provider(provider_name)
if provider is None:
continue
result = provider.delete_event(native_id, calendar_name=calendar_name)
if result.get("error"):
errors.append(f"{provider_name}: {result['error']}")
Repobility · MCP-ready · https://repobility.com
ClaudeM365Bridge._sanitize_for_prompt method · python · L14-L23 (10 LOC)connectors/claude_m365_bridge.py
def _sanitize_for_prompt(text: str, max_length: int = 500) -> str:
"""Sanitize user input before embedding in a Claude prompt."""
if text is None:
return ""
# Strip control characters (keep only printable + space)
sanitized = "".join(c for c in str(text) if c.isprintable() or c == " ")
# Truncate
if len(sanitized) > max_length:
sanitized = sanitized[:max_length] + "..."
return sanitizedClaudeM365Bridge.__init__ method · python · L25-L39 (15 LOC)connectors/claude_m365_bridge.py
def __init__(
self,
claude_bin: str = "claude",
mcp_config: str = "",
model: str = "sonnet",
timeout_seconds: int = 90,
detect_timeout_seconds: int = 5,
runner: Callable[..., subprocess.CompletedProcess] | None = None,
):
self.claude_bin = claude_bin
self.mcp_config = mcp_config
self.model = model
self.timeout_seconds = timeout_seconds
self.detect_timeout_seconds = detect_timeout_seconds
self._runner = runner or subprocess.runClaudeM365Bridge.is_connector_connected method · python · L41-L49 (9 LOC)connectors/claude_m365_bridge.py
def is_connector_connected(self) -> bool:
args = [self.claude_bin, "mcp", "list"]
if self.mcp_config:
args.extend(["--mcp-config", self.mcp_config])
proc = self._run(args, timeout=self.detect_timeout_seconds)
if proc is None or proc.returncode != 0:
return False
output = f"{proc.stdout or ''}\n{proc.stderr or ''}"
return bool(re.search(r"microsoft\s*365:.*\bconnected\b", output, flags=re.IGNORECASE))ClaudeM365Bridge.list_calendars method · python · L51-L65 (15 LOC)connectors/claude_m365_bridge.py
def list_calendars(self) -> list[dict]:
schema = {
"type": "object",
"properties": {"results": {"type": "array", "items": {"type": "object"}}},
"required": ["results"],
}
prompt = (
"Use only Microsoft 365 MCP connector tools to list Outlook/Exchange calendars. "
"Return calendar rows with useful fields such as name, calendar_id, source_account, type, and color "
"when available."
)
data = self._invoke_structured(prompt, schema)
if data.get("error"):
return [data]
return [dict(row) for row in data.get("results", []) if isinstance(row, dict)]ClaudeM365Bridge.get_events method · python · L67-L93 (27 LOC)connectors/claude_m365_bridge.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
) -> list[dict]:
schema = {
"type": "object",
"properties": {"results": {"type": "array", "items": {"type": "object"}}},
"required": ["results"],
}
filter_clause = (
"Limit to these calendars when possible: "
f"<user_calendar_names>{', '.join(self._sanitize_for_prompt(n) for n in calendar_names)}</user_calendar_names>. "
if calendar_names
else ""
)
prompt = (
"Use only Microsoft 365 MCP connector tools to get calendar events. "
f"Time range start={start_dt.isoformat()} end={end_dt.isoformat()}. "
f"{filter_clause}"
"Return results with fields like uid/native_id, title, start, end, calendar/calendar_id, source_account."
)
data = self._invoke_structured(prompt, ClaudeM365Bridge.search_events method · python · L95-L110 (16 LOC)connectors/claude_m365_bridge.py
def search_events(self, query: str, start_dt: datetime, end_dt: datetime) -> list[dict]:
schema = {
"type": "object",
"properties": {"results": {"type": "array", "items": {"type": "object"}}},
"required": ["results"],
}
prompt = (
"Use only Microsoft 365 MCP connector tools to search Outlook/Exchange calendar events by title text. "
f"query=<user_query>{self._sanitize_for_prompt(query)}</user_query>, "
f"start={start_dt.isoformat()}, end={end_dt.isoformat()}. "
"Return results with fields like uid/native_id, title, start, end, calendar/calendar_id, source_account."
)
data = self._invoke_structured(prompt, schema)
if data.get("error"):
return [data]
return [dict(row) for row in data.get("results", []) if isinstance(row, dict)]ClaudeM365Bridge.create_event method · python · L112-L141 (30 LOC)connectors/claude_m365_bridge.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
) -> dict:
schema = {
"type": "object",
"properties": {"result": {"type": "object"}},
"required": ["result"],
}
prompt = (
"Use only Microsoft 365 MCP connector tools to create a calendar event in Outlook/Exchange. "
f"title=<user_title>{self._sanitize_for_prompt(title)}</user_title>, "
f"start={start_dt.isoformat()}, end={end_dt.isoformat()}, "
f"calendar_name=<user_calendar_name>{self._sanitize_for_prompt(calendar_name)}</user_calendar_name>, "
f"location=<user_location>{self._sanitize_for_prompt(location)}</user_location>, "
f"notes=<user_notes>{self._sanitize_for_prompt(notes, max_lengthClaudeM365Bridge.update_event method · python · L143-L169 (27 LOC)connectors/claude_m365_bridge.py
def update_event(self, event_uid: str, calendar_name: Optional[str] = None, **kwargs) -> dict:
schema = {
"type": "object",
"properties": {"result": {"type": "object"}},
"required": ["result"],
}
safe_kwargs = dict(kwargs)
if isinstance(safe_kwargs.get("start_dt"), datetime):
safe_kwargs["start_dt"] = safe_kwargs["start_dt"].isoformat()
if isinstance(safe_kwargs.get("end_dt"), datetime):
safe_kwargs["end_dt"] = safe_kwargs["end_dt"].isoformat()
# Sanitize user-supplied string values in kwargs
sanitized_kwargs = {}
for k, v in safe_kwargs.items():
sanitized_kwargs[k] = self._sanitize_for_prompt(v) if isinstance(v, str) else v
prompt = (
"Use only Microsoft 365 MCP connector tools to update an existing Outlook/Exchange calendar event. "
f"event_uid=<user_event_uid>{self._sanitize_for_prompt(event_uid)}</user_event_uid>Repobility · code-quality intelligence · https://repobility.com
ClaudeM365Bridge.delete_event method · python · L171-L190 (20 LOC)connectors/claude_m365_bridge.py
def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
schema = {
"type": "object",
"properties": {
"status": {"type": "string"},
"event_uid": {"type": "string"},
"error": {"type": ["string", "null"]},
},
"required": ["status"],
}
prompt = (
"Use only Microsoft 365 MCP connector tools to delete an Outlook/Exchange calendar event. "
f"event_uid=<user_event_uid>{self._sanitize_for_prompt(event_uid)}</user_event_uid>, "
f"calendar_name=<user_calendar_name>{self._sanitize_for_prompt(calendar_name)}</user_calendar_name>. "
"Return status and event_uid."
)
data = self._invoke_structured(prompt, schema)
if data.get("error"):
return data
return dict(data)ClaudeM365Bridge._invoke_structured method · python · L192-L240 (49 LOC)connectors/claude_m365_bridge.py
def _invoke_structured(self, prompt: str, schema: dict) -> dict:
args = [
self.claude_bin,
"-p",
prompt,
"--output-format",
"json",
"--json-schema",
json.dumps(schema),
"--no-session-persistence",
"--disable-slash-commands",
"--model",
self.model,
]
if self.mcp_config:
args.extend(["--mcp-config", self.mcp_config])
proc = self._run(args, timeout=self.timeout_seconds)
if proc is None:
return {"error": "Failed to invoke Claude CLI"}
if proc.returncode != 0:
err = (proc.stderr or proc.stdout or "").strip()
return {"error": f"Claude bridge command failed: {err or 'unknown error'}"}
payload = self._parse_output_json(proc.stdout or "")
if payload is None:
return {"error": "Claude bridge returned invalid JSON"}
structured = paClaudeM365Bridge._run method · python · L242-L254 (13 LOC)connectors/claude_m365_bridge.py
def _run(self, args: list[str], timeout: int) -> subprocess.CompletedProcess | None:
try:
return self._runner(
args,
capture_output=True,
text=True,
timeout=timeout,
check=False,
)
except FileNotFoundError:
return None
except Exception:
return NoneClaudeM365Bridge._parse_output_json method · python · L257-L265 (9 LOC)connectors/claude_m365_bridge.py
def _parse_output_json(text: str) -> dict | None:
text = (text or "").strip()
if not text:
return None
try:
data = json.loads(text)
return data if isinstance(data, dict) else None
except json.JSONDecodeError:
return NoneClaudeM365Bridge._parse_first_json_object method · python · L268-L303 (36 LOC)connectors/claude_m365_bridge.py
def _parse_first_json_object(text: str) -> dict | None:
s = text or ""
n = len(s)
i = 0
while i < n:
if s[i] != "{":
i += 1
continue
start = i
depth = 0
in_string = False
escape = False
j = i
while j < n:
c = s[j]
if escape:
escape = False
elif c == "\\" and in_string:
escape = True
elif c == '"':
in_string = not in_string
elif not in_string:
if c == "{":
depth += 1
elif c == "}":
depth -= 1
if depth == 0:
candidate = s[start:j + 1]
try:
parsed = json.loads(candidate)
retCalendarProvider.get_events method · python · L22-L28 (7 LOC)connectors/provider_base.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
) -> list[dict]:
"""Get events within a date range."""CalendarProvider.create_event method · python · L31-L41 (11 LOC)connectors/provider_base.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
) -> dict:
"""Create an event."""CalendarProvider.update_event method · python · L44-L50 (7 LOC)connectors/provider_base.py
def update_event(
self,
event_uid: str,
calendar_name: Optional[str] = None,
**kwargs,
) -> dict:
"""Update an existing event."""Repobility (the analyzer behind this table) · https://repobility.com
CalendarProvider.search_events method · python · L57-L63 (7 LOC)connectors/provider_base.py
def search_events(
self,
query: str,
start_dt: datetime,
end_dt: datetime,
) -> list[dict]:
"""Search events in a date range."""AppleCalendarProvider.list_calendars method · python · L22-L31 (10 LOC)connectors/providers/apple_provider.py
def list_calendars(self) -> list[dict]:
rows = self.store.list_calendars()
if self._contains_error(rows):
return rows
self._calendar_source_map = {
str(row.get("name", "")): str(row.get("source", "") or "")
for row in rows
if isinstance(row, dict)
}
return [self._tag_calendar(dict(row)) for row in rows if isinstance(row, dict)]AppleCalendarProvider.get_events method · python · L33-L42 (10 LOC)connectors/providers/apple_provider.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
) -> list[dict]:
rows = self.store.get_events(start_dt, end_dt, calendar_names=calendar_names)
if self._contains_error(rows):
return rows
return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]AppleCalendarProvider.create_event method · python · L44-L65 (22 LOC)connectors/providers/apple_provider.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
) -> dict:
result = self.store.create_event(
title=title,
start_dt=start_dt,
end_dt=end_dt,
calendar_name=calendar_name,
location=location,
notes=notes,
is_all_day=is_all_day,
)
if result.get("error"):
return result
return self._tag_event(dict(result))AppleCalendarProvider.update_event method · python · L67-L76 (10 LOC)connectors/providers/apple_provider.py
def update_event(
self,
event_uid: str,
calendar_name: Optional[str] = None,
**kwargs,
) -> dict:
result = self.store.update_event(event_uid, calendar_name=calendar_name, **kwargs)
if result.get("error"):
return result
return self._tag_event(dict(result))AppleCalendarProvider.delete_event method · python · L78-L86 (9 LOC)connectors/providers/apple_provider.py
def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
result = self.store.delete_event(event_uid, calendar_name=calendar_name)
if result.get("error"):
return result
tagged = dict(result)
tagged["provider"] = self.provider_name
tagged["native_id"] = event_uid
tagged["unified_uid"] = f"{self.provider_name}:{event_uid}"
return taggedAppleCalendarProvider.search_events method · python · L88-L97 (10 LOC)connectors/providers/apple_provider.py
def search_events(
self,
query: str,
start_dt: datetime,
end_dt: datetime,
) -> list[dict]:
rows = self.store.search_events(query, start_dt, end_dt)
if self._contains_error(rows):
return rows
return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]AppleCalendarProvider._tag_calendar method · python · L103-L108 (6 LOC)connectors/providers/apple_provider.py
def _tag_calendar(self, row: dict) -> dict:
tagged = dict(row)
tagged["provider"] = self.provider_name
tagged["source_account"] = str(row.get("source", "") or "")
tagged["calendar_id"] = str(row.get("name", "") or "")
return taggedAbout: code-quality intelligence by Repobility · https://repobility.com
AppleCalendarProvider._tag_event method · python · L110-L125 (16 LOC)connectors/providers/apple_provider.py
def _tag_event(self, row: dict) -> dict:
tagged = dict(row)
calendar_name = str(tagged.get("calendar", "") or "")
source_account = self._calendar_source_map.get(calendar_name, "")
if not source_account:
# Lazy refresh in case events are fetched before list_calendars.
self.list_calendars()
source_account = self._calendar_source_map.get(calendar_name, "")
native_id = str(tagged.get("uid", "") or "")
tagged["provider"] = self.provider_name
tagged["source_account"] = source_account
tagged["calendar_id"] = calendar_name
tagged["native_id"] = native_id
tagged["unified_uid"] = f"{self.provider_name}:{native_id}" if native_id else ""
return taggedMicrosoft365CalendarProvider.__init__ method · python · L19-L37 (19 LOC)connectors/providers/m365_provider.py
def __init__(
self,
connected: bool = False,
list_calendars_fn: Callable[[], list[dict]] | None = None,
get_events_fn: Callable[[datetime, datetime, Optional[list[str]]], list[dict]] | None = None,
create_event_fn: Callable[..., dict] | None = None,
update_event_fn: Callable[..., dict] | None = None,
delete_event_fn: Callable[..., dict] | None = None,
search_events_fn: Callable[[str, datetime, datetime], list[dict]] | None = None,
):
self._connected = bool(connected)
self._hooks = {
"list_calendars": list_calendars_fn,
"get_events": get_events_fn,
"create_event": create_event_fn,
"update_event": update_event_fn,
"delete_event": delete_event_fn,
"search_events": search_events_fn,
}Microsoft365CalendarProvider.list_calendars method · python · L45-L52 (8 LOC)connectors/providers/m365_provider.py
def list_calendars(self) -> list[dict]:
if not self.is_connected():
return [self._not_connected_error()]
hook = self._hooks["list_calendars"]
if hook is None:
return [self._not_configured_error("list calendars")]
rows = hook()
return [self._tag_calendar(dict(row)) for row in rows if isinstance(row, dict)]Microsoft365CalendarProvider.get_events method · python · L54-L66 (13 LOC)connectors/providers/m365_provider.py
def get_events(
self,
start_dt: datetime,
end_dt: datetime,
calendar_names: Optional[list[str]] = None,
) -> list[dict]:
if not self.is_connected():
return [self._not_connected_error()]
hook = self._hooks["get_events"]
if hook is None:
return [self._not_configured_error("get events")]
rows = hook(start_dt, end_dt, calendar_names)
return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]Microsoft365CalendarProvider.create_event method · python · L68-L94 (27 LOC)connectors/providers/m365_provider.py
def create_event(
self,
title: str,
start_dt: datetime,
end_dt: datetime,
calendar_name: Optional[str] = None,
location: Optional[str] = None,
notes: Optional[str] = None,
is_all_day: bool = False,
) -> dict:
if not self.is_connected():
return self._not_connected_error()
hook = self._hooks["create_event"]
if hook is None:
return self._not_configured_error("create events")
row = hook(
title=title,
start_dt=start_dt,
end_dt=end_dt,
calendar_name=calendar_name,
location=location,
notes=notes,
is_all_day=is_all_day,
)
if row.get("error"):
return row
return self._tag_event(dict(row))Microsoft365CalendarProvider.update_event method · python · L96-L110 (15 LOC)connectors/providers/m365_provider.py
def update_event(
self,
event_uid: str,
calendar_name: Optional[str] = None,
**kwargs,
) -> dict:
if not self.is_connected():
return self._not_connected_error()
hook = self._hooks["update_event"]
if hook is None:
return self._not_configured_error("update events")
row = hook(event_uid=event_uid, calendar_name=calendar_name, **kwargs)
if row.get("error"):
return row
return self._tag_event(dict(row))Microsoft365CalendarProvider.delete_event method · python · L112-L125 (14 LOC)connectors/providers/m365_provider.py
def delete_event(self, event_uid: str, calendar_name: Optional[str] = None) -> dict:
if not self.is_connected():
return self._not_connected_error()
hook = self._hooks["delete_event"]
if hook is None:
return self._not_configured_error("delete events")
row = hook(event_uid=event_uid, calendar_name=calendar_name)
if row.get("error"):
return row
tagged = dict(row)
tagged["provider"] = self.provider_name
tagged["native_id"] = event_uid
tagged["unified_uid"] = f"{self.provider_name}:{event_uid}"
return taggedMicrosoft365CalendarProvider.search_events method · python · L127-L139 (13 LOC)connectors/providers/m365_provider.py
def search_events(
self,
query: str,
start_dt: datetime,
end_dt: datetime,
) -> list[dict]:
if not self.is_connected():
return [self._not_connected_error()]
hook = self._hooks["search_events"]
if hook is None:
return [self._not_configured_error("search events")]
rows = hook(query, start_dt, end_dt)
return [self._tag_event(dict(row)) for row in rows if isinstance(row, dict)]Repobility · MCP-ready · https://repobility.com
Microsoft365CalendarProvider._not_connected_error method · python · L142-L148 (7 LOC)connectors/providers/m365_provider.py
def _not_connected_error() -> dict:
return {
"error": (
"Microsoft 365 connector is not connected for this runtime. "
"Use provider_preference='auto' or target_provider='apple' to fallback."
)
}Microsoft365CalendarProvider._not_configured_error method · python · L151-L157 (7 LOC)connectors/providers/m365_provider.py
def _not_configured_error(operation: str) -> dict:
return {
"error": (
f"Microsoft 365 provider is connected but no adapter hook is configured for '{operation}'. "
"Wire a provider bridge into Microsoft365CalendarProvider hooks."
)
}Microsoft365CalendarProvider._tag_calendar method · python · L159-L164 (6 LOC)connectors/providers/m365_provider.py
def _tag_calendar(self, row: dict) -> dict:
tagged = dict(row)
tagged["provider"] = self.provider_name
tagged["source_account"] = str(row.get("source_account", "") or "Microsoft 365")
tagged["calendar_id"] = str(row.get("calendar_id", "") or row.get("name", "") or "")
return taggedMicrosoft365CalendarProvider._tag_event method · python · L166-L174 (9 LOC)connectors/providers/m365_provider.py
def _tag_event(self, row: dict) -> dict:
tagged = dict(row)
native_id = str(tagged.get("native_id", "") or tagged.get("uid", "") or "")
tagged["provider"] = self.provider_name
tagged["source_account"] = str(tagged.get("source_account", "") or "Microsoft 365")
tagged["calendar_id"] = str(tagged.get("calendar_id", "") or tagged.get("calendar", "") or "")
tagged["native_id"] = native_id
tagged["unified_uid"] = f"{self.provider_name}:{native_id}" if native_id else ""
return taggedProviderRouter.decide_read method · python · L49-L88 (40 LOC)connectors/router.py
def decide_read(self, provider_preference: str = "auto") -> RouteDecision:
pref = normalize_provider_name(provider_preference) or (provider_preference or "").strip().lower()
has_apple = self.is_connected("apple")
has_m365 = self.is_connected("microsoft_365")
if pref == "both":
if has_m365 and has_apple:
return RouteDecision(["microsoft_365", "apple"], "microsoft_365", "apple", "explicit_both")
if has_m365:
return RouteDecision(["microsoft_365"], "microsoft_365", "", "both_requested_m365_only")
if has_apple:
return RouteDecision(["apple"], "apple", "", "both_requested_apple_only")
return RouteDecision([], "", "", "no_provider_connected")
if pref == "microsoft_365":
if has_m365:
fallback = "apple" if has_apple else ""
providers = ["microsoft_365"] + (["apple"] if fallback else [])
return RProviderRouter.decide_write method · python · L90-L132 (43 LOC)connectors/router.py
def decide_write(
self,
target_provider: str = "",
calendar_name: str = "",
provider_preference: str = "auto",
) -> RouteDecision:
target = normalize_provider_name(target_provider)
pref = normalize_provider_name(provider_preference)
has_apple = self.is_connected("apple")
has_m365 = self.is_connected("microsoft_365")
if target:
if self.is_connected(target):
fallback = "apple" if target == "microsoft_365" and has_apple else ""
if target == "apple" and has_m365:
fallback = "microsoft_365"
providers = [target] + ([fallback] if fallback else [])
return RouteDecision(providers, target, fallback, "explicit_target")
fallback = "apple" if has_apple else ("microsoft_365" if has_m365 else "")
providers = [fallback] if fallback else []
reason = f"target_unavailable_fallback_{fallback}" ifProviderRouter._looks_work_calendar method · python · L135-L140 (6 LOC)connectors/router.py
def _looks_work_calendar(calendar_name: str) -> bool:
name = (calendar_name or "").lower()
if not name:
return False
keywords = ("work", "office", "outlook", "exchange", "corp", "company", "team")
return any(kw in name for kw in keywords)chunk_text function · python · L13-L26 (14 LOC)documents/ingestion.py
def chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
words = text.split()
if len(words) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(words):
end = start + chunk_size
chunk = " ".join(words[start:end])
chunks.append(chunk)
start = end - overlap
return chunksRepobility · code-quality intelligence · https://repobility.com
_load_pdf function · python · L29-L43 (15 LOC)documents/ingestion.py
def _load_pdf(file_path: Path) -> str:
"""Load text from a PDF file."""
try:
from pypdf import PdfReader
except ImportError:
raise ImportError("pypdf library is required for PDF support. Install with: pip install pypdf")
reader = PdfReader(file_path)
text_parts = []
for page in reader.pages:
page_text = page.extract_text()
if page_text.strip(): # Only add non-empty pages
text_parts.append(page_text)
return "\n".join(text_parts)_load_docx function · python · L46-L68 (23 LOC)documents/ingestion.py
def _load_docx(file_path: Path) -> str:
"""Load text from a DOCX file."""
try:
from docx import Document
except ImportError:
raise ImportError("python-docx library is required for DOCX support. Install with: pip install python-docx")
doc = Document(file_path)
text_parts = []
# Extract paragraph text
for paragraph in doc.paragraphs:
if paragraph.text.strip():
text_parts.append(paragraph.text)
# Extract table text
for table in doc.tables:
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells if cell.text.strip())
if row_text:
text_parts.append(row_text)
return "\n".join(text_parts)load_text_file function · python · L71-L87 (17 LOC)documents/ingestion.py
def load_text_file(path: Path) -> str:
"""Load text from a file based on its extension."""
if path.is_symlink():
raise ValueError(f"Refusing to read symlink: {path}")
file_size = path.stat().st_size
if file_size > MAX_FILE_SIZE_BYTES:
raise ValueError(f"File too large ({file_size} bytes, max {MAX_FILE_SIZE_BYTES}): {path.name}")
suffix = path.suffix.lower()
if suffix == ".pdf":
return _load_pdf(path)
elif suffix == ".docx":
return _load_docx(path)
else:
return path.read_text(encoding="utf-8")