← back to glibalien__obsidian-tools

Function bodies 154 total

All specs Real LLM only Function bodies
ChatView.onOpen method · typescript · L35-L67 (33 LOC)
plugin/src/ChatView.ts
	async onOpen(): Promise<void> {
		const container = this.containerEl.children[1];
		container.empty();
		container.addClass("vault-chat-container");

		// Messages area
		this.messagesContainer = container.createDiv({ cls: "chat-messages" });

		// Input area
		const inputContainer = container.createDiv({ cls: "chat-input-container" });

		this.inputField = inputContainer.createEl("textarea", {
			cls: "chat-input",
			attr: { placeholder: "Type a message..." }
		});

		this.sendButton = inputContainer.createEl("button", {
			cls: "chat-send-button",
			text: "Send"
		});

		// Event listeners
		this.sendButton.addEventListener("click", () => this.sendMessage());
		this.inputField.addEventListener("keydown", (e) => {
			if (e.key === "Enter" && !e.shiftKey) {
				e.preventDefault();
				this.sendMessage();
			}
		});

		// Welcome message
		await this.addMessage("assistant", "Hello! I'm your vault assistant. How can I help you today?");
	}
ChatView.addMessage method · typescript · L73-L90 (18 LOC)
plugin/src/ChatView.ts
	private async addMessage(role: "user" | "assistant", content: string, sourcePath = ""): Promise<void> {
		this.messages.push({ role, content });

		const messageEl = this.messagesContainer.createDiv({
			cls: `chat-message chat-message-${role}`
		});

		const contentEl = messageEl.createDiv({ cls: "chat-message-content" });

		if (role === "assistant") {
			await MarkdownRenderer.render(this.app, content, contentEl, sourcePath, this);
		} else {
			contentEl.setText(content);
		}

		// Auto-scroll to bottom
		this.messagesContainer.scrollTop = this.messagesContainer.scrollHeight;
	}
ChatView.formatToolStatus method · typescript · L106-L122 (17 LOC)
plugin/src/ChatView.ts
	private formatToolStatus(toolName: string): string {
		const labels: Record<string, string> = {
			search_vault: "Searching vault...",
			read_file: "Reading file...",
			find_backlinks: "Finding backlinks...",
			find_outlinks: "Finding outlinks...",
			search_by_folder: "Listing folder...",
			list_files_by_frontmatter: "Searching frontmatter...",
			web_search: "Searching the web...",
			create_file: "Creating file...",
			move_file: "Moving file...",
			update_frontmatter: "Updating frontmatter...",
			log_interaction: "Logging interaction...",
			transcribe_audio: "Transcribing audio...",
		};
		return labels[toolName] ?? `Running ${toolName}...`;
	}
ChatView.disablePendingConfirmation method · typescript · L124-L130 (7 LOC)
plugin/src/ChatView.ts
	private disablePendingConfirmation(): void {
		if (!this.pendingConfirmationEl) return;
		this.pendingConfirmationEl.querySelectorAll("button").forEach(btn => {
			(btn as HTMLButtonElement).disabled = true;
		});
		this.pendingConfirmationEl = null;
	}
ChatView.addConfirmationPreview method · typescript · L132-L180 (49 LOC)
plugin/src/ChatView.ts
	private addConfirmationPreview(message: string, files: string[]): void {
		const previewEl = this.messagesContainer.createDiv({ cls: "chat-confirmation-preview" });

		// Action description
		previewEl.createDiv({ cls: "preview-message", text: message });

		// File list
		const filesEl = previewEl.createDiv({ cls: "preview-files" });
		const visibleCount = 10;
		const visibleFiles = files.slice(0, visibleCount);
		for (const file of visibleFiles) {
			filesEl.createDiv({ text: file });
		}

		if (files.length > visibleCount) {
			const expandEl = previewEl.createDiv({
				cls: "preview-expand",
				text: `and ${files.length - visibleCount} more...`,
			});
			expandEl.addEventListener("click", () => {
				for (const file of files.slice(visibleCount)) {
					filesEl.createDiv({ text: file });
				}
				expandEl.remove();
			});
		}

		// Buttons
		const buttonsEl = previewEl.createDiv({ cls: "preview-buttons" });

		const confirmBtn = buttonsEl.createEl("button", {
			cls: "preview-co
ChatView.sendMessageText method · typescript · L182-L275 (94 LOC)
plugin/src/ChatView.ts
	private async sendMessageText(text: string): Promise<void> {
		if (this.isLoading) return;

		// Disable any pending confirmation buttons
		this.disablePendingConfirmation();

		// Capture active file once at request time for consistent context
		const activeFile = this.getActiveFilePath();

		this.isLoading = true;
		this.sendButton.disabled = true;

		// Add user message
		await this.addMessage("user", text);

		// Show loading
		const { container: loadingEl, textEl: loadingText } = this.showLoading();

		try {
			const response = await fetch("http://127.0.0.1:8000/chat/stream", {
				method: "POST",
				headers: { "Content-Type": "application/json" },
				body: JSON.stringify({
					message: text,
					session_id: this.sessionId,
					active_file: activeFile
				})
			});

			if (!response.ok || !response.body) {
				throw new Error(`Server returned ${response.status}`);
			}

			const reader = response.body.getReader();
			const decoder = new TextDecoder();
			let buffer = "";
			
ChatView.sendMessage method · typescript · L277-L282 (6 LOC)
plugin/src/ChatView.ts
	private async sendMessage(): Promise<void> {
		const message = this.inputField.value.trim();
		if (!message || this.isLoading) return;
		this.inputField.value = "";
		await this.sendMessageText(message);
	}
Repobility · code-quality intelligence · https://repobility.com
VaultChatPlugin.onload method · typescript · L5-L25 (21 LOC)
plugin/src/main.ts
	async onload(): Promise<void> {
		// Register the chat view
		this.registerView(
			VIEW_TYPE_CHAT,
			(leaf: WorkspaceLeaf) => new ChatView(leaf)
		);

		// Add ribbon icon
		this.addRibbonIcon("message-circle", "Open Vault Chat", () => {
			this.activateView();
		});

		// Add command
		this.addCommand({
			id: "open-vault-chat",
			name: "Open Vault Chat",
			callback: () => {
				this.activateView();
			}
		});
	}
VaultChatPlugin.activateView method · typescript · L32-L54 (23 LOC)
plugin/src/main.ts
	private async activateView(): Promise<void> {
		const { workspace } = this.app;

		// Check if view already exists
		let leaf = workspace.getLeavesOfType(VIEW_TYPE_CHAT)[0];

		if (!leaf) {
			// Create new leaf in right sidebar
			const rightLeaf = workspace.getRightLeaf(false);
			if (rightLeaf) {
				leaf = rightLeaf;
				await leaf.setViewState({
					type: VIEW_TYPE_CHAT,
					active: true
				});
			}
		}

		// Reveal the leaf
		if (leaf) {
			workspace.revealLeaf(leaf);
		}
	}
truncate_tool_result function · python · L39-L55 (17 LOC)
src/agent.py
def truncate_tool_result(result: str, result_id: str | None = None) -> str:
    """Truncate tool result if it exceeds the character limit.

    When result_id is provided, the truncation marker includes it
    so the LLM can call get_continuation to retrieve more.
    """
    if len(result) <= MAX_TOOL_RESULT_CHARS:
        return result
    truncated = result[:MAX_TOOL_RESULT_CHARS]
    if result_id:
        truncated += (
            f"\n\n[truncated — showing {MAX_TOOL_RESULT_CHARS}/{len(result)} chars. "
            f'Call get_continuation with id="{result_id}" to read more]'
        )
    else:
        truncated += "\n\n[truncated]"
    return truncated
load_system_prompt function · python · L58-L71 (14 LOC)
src/agent.py
def load_system_prompt() -> str:
    """Load system prompt from system_prompt.txt, falling back to .example."""
    if SYSTEM_PROMPT_FILE.exists():
        return SYSTEM_PROMPT_FILE.read_text(encoding="utf-8").strip()

    if SYSTEM_PROMPT_EXAMPLE.exists():
        logger.warning(
            "system_prompt.txt not found — using system_prompt.txt.example. "
            "Copy it to system_prompt.txt and customize for your vault."
        )
        return SYSTEM_PROMPT_EXAMPLE.read_text(encoding="utf-8").strip()

    logger.error("No system prompt file found. Using minimal fallback.")
    return "You are a helpful assistant with access to an Obsidian vault."
load_preferences function · python · L77-L96 (20 LOC)
src/agent.py
def load_preferences() -> str | None:
    """Load user preferences from Preferences.md if it exists.

    Returns:
        Preferences section to append to system prompt, or None if no preferences.
    """
    if not PREFERENCES_FILE.exists():
        return None

    content = PREFERENCES_FILE.read_text(encoding="utf-8").strip()
    if not content:
        return None

    return f"""

## User Preferences

The following are user preferences and corrections. Always follow these:

{content}"""
create_llm_client function · python · L99-L104 (6 LOC)
src/agent.py
def create_llm_client() -> OpenAI:
    """Create OpenAI client configured for Fireworks API."""
    if not FIREWORKS_API_KEY:
        print("Error: FIREWORKS_API_KEY not set in .env", file=sys.stderr)
        sys.exit(1)
    return OpenAI(api_key=FIREWORKS_API_KEY, base_url=FIREWORKS_BASE_URL)
_parse_tool_arguments function · python · L107-L147 (41 LOC)
src/agent.py
def _parse_tool_arguments(raw: str) -> dict:
    """Parse tool call arguments with fallbacks for common model quirks.

    Known issues handled:
    - gpt-oss-120b appends ``\\t<|call|>`` control tokens after the JSON
    - Some models emit Python-style dicts (single quotes, True/False/None)
    - Trailing commas before } or ]
    """
    if not raw or not raw.strip():
        return {}

    # Strip model control tokens like <|call|>, <|end|>, etc.
    cleaned = re.sub(r"<\|[^|]+\|>", "", raw).strip()

    # Fast path: valid JSON
    try:
        parsed = json.loads(cleaned)
        if isinstance(parsed, dict):
            return parsed
    except (json.JSONDecodeError, TypeError):
        pass

    # Fallback: Python literal syntax (single quotes, True/False/None)
    try:
        parsed = ast.literal_eval(cleaned)
        if isinstance(parsed, dict):
            return parsed
    except (ValueError, SyntaxError):
        pass

    # Last resort: strip trailing commas before } or ] an
_simplify_schema function · python · L150-L185 (36 LOC)
src/agent.py
def _simplify_schema(schema: dict) -> dict:
    """Inline $ref references and simplify anyOf nullable patterns.

    Pydantic/FastMCP generates $defs + $ref for Pydantic models and
    anyOf: [T, {type: null}] for Optional types.  Weaker models struggle
    with the indirection — inline everything so the schema is flat.
    """
    schema = copy.deepcopy(schema)
    defs = schema.pop("$defs", {})

    def _resolve(node):
        if isinstance(node, dict):
            # Resolve $ref → inline the referenced definition
            if "$ref" in node:
                ref_name = node["$ref"].rsplit("/", 1)[-1]
                if ref_name in defs:
                    return _resolve(copy.deepcopy(defs[ref_name]))
                return node

            # Simplify anyOf[T, null] → T (keep default/title/description)
            if "anyOf" in node:
                non_null = [o for o in node["anyOf"] if o != {"type": "null"}]
                if len(non_null) == 1:
                    merged = {
Open data scored by Repobility · https://repobility.com
mcp_tool_to_openai_function function · python · L188-L197 (10 LOC)
src/agent.py
def mcp_tool_to_openai_function(tool) -> dict:
    """Convert MCP Tool to OpenAI function calling format."""
    return {
        "type": "function",
        "function": {
            "name": tool.name,
            "description": tool.description or "",
            "parameters": _simplify_schema(tool.inputSchema),
        },
    }
extract_text_content function · python · L200-L206 (7 LOC)
src/agent.py
def extract_text_content(content) -> str:
    """Extract text from MCP content blocks."""
    text_parts = []
    for block in content:
        if hasattr(block, "text"):
            text_parts.append(block.text)
    return "\n".join(text_parts) if text_parts else str(content)
execute_tool_call function · python · L209-L223 (15 LOC)
src/agent.py
async def execute_tool_call(
    session: ClientSession, tool_name: str, arguments: dict
) -> str:
    """Execute a tool call via MCP and return the result."""
    try:
        with anyio.fail_after(TOOL_TIMEOUT):
            result = await session.call_tool(tool_name, arguments)
        if result.isError:
            return f"Tool error: {extract_text_content(result.content)}"
        return extract_text_content(result.content)
    except TimeoutError:
        logger.warning("Tool '%s' timed out after %ds", tool_name, TOOL_TIMEOUT)
        return f"Tool error: '{tool_name}' timed out after {TOOL_TIMEOUT}s"
    except Exception as e:
        return f"Failed to execute tool {tool_name}: {e}"
ensure_interaction_logged function · python · L226-L260 (35 LOC)
src/agent.py
async def ensure_interaction_logged(
    session: ClientSession,
    messages: list[dict],
    turn_start: int,
    user_query: str,
    response: str,
) -> None:
    """Auto-log interaction if agent didn't call log_interaction during the turn.

    Scans messages added during the turn for tool calls. If any tool calls
    were made but none named ``log_interaction``, fires a log_interaction
    call via MCP so the interaction is recorded in the daily note.
    """
    tool_names_called: list[str] = []
    for msg in messages[turn_start:]:
        if msg.get("role") == "assistant":
            for tc in msg.get("tool_calls") or []:
                name = tc.get("function", {}).get("name", "")
                if name:
                    tool_names_called.append(name)

    if not tool_names_called:
        return  # Conversation only — no action taken

    if "log_interaction" in tool_names_called:
        return  # Agent already logged

    logger.warning("Agent did not call log_intera
_handle_get_continuation function · python · L289-L311 (23 LOC)
src/agent.py
def _handle_get_continuation(cache: dict[str, str], arguments: dict) -> str:
    """Serve the next chunk of a cached truncated tool result."""
    result_id = arguments.get("id", "")
    offset = arguments.get("offset", MAX_TOOL_RESULT_CHARS)

    full_result = cache.get(result_id)
    if full_result is None:
        return json.dumps({"error": f"No cached result for id '{result_id}'"})

    chunk = full_result[offset : offset + MAX_TOOL_RESULT_CHARS]
    if not chunk:
        return json.dumps({"error": "Offset beyond end of result"})

    end = offset + len(chunk)
    remaining = len(full_result) - end
    if remaining > 0:
        chunk += (
            f"\n\n[truncated — showing {offset}-{end}/{len(full_result)} chars. "
            f"{remaining} chars remaining. Call get_continuation with "
            f'id="{result_id}" offset={end} to read more]'
        )

    return chunk
_process_tool_calls function · python · L317-L430 (114 LOC)
src/agent.py
async def _process_tool_calls(
    tool_calls,
    session: ClientSession,
    messages: list[dict],
    truncated_results: dict[str, str],
    next_result_id: int,
    emit: EventCallback | None,
    last_tool_call: dict | None = None,
) -> tuple[int, bool, dict | None]:
    """Execute tool calls from an assistant message and append results to messages.

    Returns (updated next_result_id, confirmation_required, preview_data).
    ``last_tool_call`` is a mutable dict tracking the previous call for dedup.
    ``preview_data`` is non-None when a confirmation preview should be emitted
    by the caller after the response event (to ensure correct SSE ordering).
    """

    async def _emit(event_type: str, data: dict) -> None:
        if emit is not None:
            await emit(event_type, data)

    confirmation_required = False
    preview_data = None

    for i, tool_call in enumerate(tool_calls):
        tool_name = tool_call.function.name
        raw_args = tool_call.function.argume
agent_turn function · python · L433-L547 (115 LOC)
src/agent.py
async def agent_turn(
    client: OpenAI,
    session: ClientSession,
    messages: list[dict],
    tools: list[dict],
    max_iterations: int = 20,
    on_event: EventCallback | None = None,
) -> str:
    """Execute one agent turn, handling tool calls until final response."""
    turn_prompt_tokens = 0
    turn_completion_tokens = 0
    llm_calls = 0
    last_content = ""
    truncated_results: dict[str, str] = {}
    next_result_id = 1
    # Tool names excluded from the iteration cap count
    UNCOUNTED_TOOLS = {"log_interaction", "get_continuation"}
    all_tools = tools + [GET_CONTINUATION_TOOL]
    force_text_only = False
    text_only_retries = 0
    MAX_TEXT_ONLY_RETRIES = 3
    last_tool_call: dict = {}
    pending_preview: dict | None = None

    async def _emit(event_type: str, data: dict) -> None:
        if on_event is not None:
            await on_event(event_type, data)

    while True:
        if llm_calls >= max_iterations:
            logger.warning(
                "
chat_loop function · python · L551-L638 (88 LOC)
src/agent.py
async def chat_loop():
    """Main chat loop - handles user input and agent responses."""
    server_params = StdioServerParameters(
        command=sys.executable,
        args=[str(PROJECT_ROOT / "src" / "mcp_server.py")],
        cwd=str(PROJECT_ROOT),
    )

    async with AsyncExitStack() as stack:
        # Set up MCP connection
        read_stream, write_stream = await stack.enter_async_context(
            stdio_client(server_params)
        )
        session = await stack.enter_async_context(
            ClientSession(read_stream, write_stream)
        )
        await session.initialize()

        # Get available tools
        tools_result = await session.list_tools()
        tools = [mcp_tool_to_openai_function(t) for t in tools_result.tools]

        tool_names = [t["function"]["name"] for t in tools]
        print(f"Connected to MCP server. Tools: {', '.join(tool_names)}")
        print("Type 'quit' or Ctrl+C to exit.\n")

        # Set up LLM client
        client = create
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_or_create_session function · python · L50-L70 (21 LOC)
src/api_server.py
def get_or_create_session(active_file: str | None, system_prompt: str) -> Session:
    """Get existing session for a file or create a new one.

    Uses LRU eviction: accessed sessions move to end, oldest evicted
    when MAX_SESSIONS is exceeded.
    """
    if active_file in file_sessions:
        file_sessions.move_to_end(active_file)
        return file_sessions[active_file]

    # Evict oldest session if at capacity
    while len(file_sessions) >= MAX_SESSIONS:
        file_sessions.popitem(last=False)

    session = Session(
        session_id=str(uuid.uuid4()),
        active_file=active_file,
        messages=[{"role": "system", "content": system_prompt}],
    )
    file_sessions[active_file] = session
    return session
trim_messages function · python · L73-L99 (27 LOC)
src/api_server.py
def trim_messages(messages: list[dict]) -> None:
    """Trim messages to MAX_SESSION_MESSAGES, preserving system prompt.

    Keeps messages[0] (system prompt) + the most recent messages.
    Avoids splitting tool call groups by advancing the trim point
    to the next user message.
    """
    if len(messages) <= MAX_SESSION_MESSAGES:
        return

    # How many non-system messages to keep
    keep = MAX_SESSION_MESSAGES - 1
    trim_index = len(messages) - keep

    # Don't trim the system prompt
    if trim_index <= 1:
        return

    # Advance trim point to avoid splitting a tool call group:
    # find the first 'user' message at or after trim_index
    while trim_index < len(messages) and messages[trim_index].get("role") != "user":
        trim_index += 1

    if trim_index >= len(messages):
        return

    del messages[1:trim_index]
_build_system_prompt function · python · L124-L130 (7 LOC)
src/api_server.py
def _build_system_prompt() -> str:
    """Build system prompt with current user preferences appended."""
    system_prompt = app.state.system_prompt
    preferences = load_preferences()
    if preferences:
        system_prompt += preferences
    return system_prompt
_setup_turn function · python · L133-L145 (13 LOC)
src/api_server.py
def _setup_turn(session: Session, request: ChatRequest, system_prompt: str) -> set[int]:
    """Prepare turn messages. Must be called with session.lock held."""
    messages = session.messages
    messages[0]["content"] = system_prompt

    compacted_indices = {i for i, msg in enumerate(messages) if msg.get("_compacted")}
    for msg in messages:
        msg.pop("_compacted", None)

    context_prefix = format_context_prefix(request.active_file)
    messages.append({"role": "user", "content": context_prefix + request.message})

    return compacted_indices
lifespan function · python · L156-L187 (32 LOC)
src/api_server.py
async def lifespan(app: FastAPI):
    """Initialize MCP session and LLM client at startup."""
    server_params = StdioServerParameters(
        command=sys.executable,
        args=[str(PROJECT_ROOT / "src" / "mcp_server.py")],
        cwd=str(PROJECT_ROOT),
    )

    async with AsyncExitStack() as stack:
        # Set up MCP connection
        read_stream, write_stream = await stack.enter_async_context(
            stdio_client(server_params)
        )
        session = await stack.enter_async_context(
            ClientSession(read_stream, write_stream)
        )
        await session.initialize()

        # Get available tools
        tools_result = await session.list_tools()
        tools = [mcp_tool_to_openai_function(t) for t in tools_result.tools]

        # Set up LLM client
        client = create_llm_client()

        # Store in app state
        app.state.mcp_session = session
        app.state.llm_client = client
        app.state.tools = tools
        app.state.system_pr
chat function · python · L208-L236 (29 LOC)
src/api_server.py
async def chat(request: ChatRequest) -> ChatResponse:
    """Process a chat message and return the agent's response."""
    system_prompt = _build_system_prompt()
    session = get_or_create_session(request.active_file, system_prompt)

    async with session.lock:
        pre_turn_length = len(session.messages)
        compacted_indices = _setup_turn(session, request, system_prompt)
        messages = session.messages
        turn_start = len(messages) - 1
        try:
            response = await agent_turn(
                app.state.llm_client,
                app.state.mcp_session,
                messages,
                app.state.tools,
            )
            await ensure_interaction_logged(
                app.state.mcp_session, messages, turn_start, request.message, response,
            )
            _restore_compacted_flags(messages, compacted_indices)
            compact_tool_messages(messages)
            trim_messages(messages)
            return ChatResponse(response=r
chat_stream function · python · L240-L297 (58 LOC)
src/api_server.py
async def chat_stream(request: ChatRequest):
    """Process a chat message and stream events as SSE."""
    system_prompt = _build_system_prompt()
    session = get_or_create_session(request.active_file, system_prompt)

    queue: asyncio.Queue[dict | None] = asyncio.Queue()

    async def on_event(event_type: str, data: dict) -> None:
        await queue.put({"type": event_type, **data})

    async def run_agent():
        try:
            async with session.lock:
                pre_turn_length = len(session.messages)
                compacted_indices = _setup_turn(session, request, system_prompt)
                messages = session.messages
                turn_start = len(messages) - 1
                try:
                    response = await agent_turn(
                        app.state.llm_client,
                        app.state.mcp_session,
                        messages,
                        app.state.tools,
                        on_event=on_event,
                    )
main function · python · L300-L308 (9 LOC)
src/api_server.py
def main():
    """Run the API server."""
    setup_logging("api")
    uvicorn.run(
        "api_server:app",
        host="127.0.0.1",
        port=API_PORT,
        reload=False,
    )
About: code-quality intelligence by Repobility · https://repobility.com
setup_logging function · python · L76-L103 (28 LOC)
src/config.py
def setup_logging(name: str) -> None:
    """Configure logging with both stderr and rotating file output.

    Args:
        name: Log file name without extension (e.g. "api", "agent").
    """
    fmt = "%(asctime)s %(name)s %(levelname)s %(message)s"
    root = logging.getLogger()
    root.setLevel(logging.INFO)

    # stderr handler (for journalctl)
    stderr_handler = logging.StreamHandler()
    stderr_handler.setFormatter(logging.Formatter(fmt))
    root.addHandler(stderr_handler)

    # Rotating file handler (best-effort — fall back to stderr-only)
    try:
        LOG_DIR.mkdir(parents=True, exist_ok=True)
        file_handler = RotatingFileHandler(
            LOG_DIR / f"{name}.log.md",
            maxBytes=LOG_MAX_BYTES,
            backupCount=LOG_BACKUP_COUNT,
            encoding="utf-8",
        )
        file_handler.setFormatter(logging.Formatter(fmt))
        root.addHandler(file_handler)
    except OSError as e:
        root.warning(f"Could not set up file logging: {
semantic_search function · python · L22-L44 (23 LOC)
src/hybrid_search.py
def semantic_search(
    query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
    """Search the vault using semantic similarity via ChromaDB embeddings.

    Args:
        query: Natural language search query.
        n_results: Maximum number of results to return.
        chunk_type: Filter by chunk type (e.g. "frontmatter", "section").

    Returns:
        List of dicts with 'source' and 'content' keys.
    """
    collection = get_collection()
    query_kwargs: dict = {"query_texts": [query], "n_results": n_results}
    if chunk_type:
        query_kwargs["where"] = {"chunk_type": chunk_type}
    results = collection.query(**query_kwargs)

    return [
        {"source": metadata["source"], "content": doc, "heading": metadata.get("heading", "")}
        for doc, metadata in zip(results["documents"][0], results["metadatas"][0])
    ]
_extract_query_terms function · python · L47-L54 (8 LOC)
src/hybrid_search.py
def _extract_query_terms(query: str) -> list[str]:
    """Split query into meaningful terms, filtering stopwords and short words."""
    terms = []
    for word in query.split():
        cleaned = word.strip(".,!?;:\"'()[]{}").lower()
        if len(cleaned) >= 3 and cleaned not in STOPWORDS:
            terms.append(cleaned)
    return terms
_case_variants function · python · L57-L69 (13 LOC)
src/hybrid_search.py
def _case_variants(terms: list[str]) -> list[str]:
    """Generate case variants for ChromaDB $contains (which is case-sensitive).

    For each term, produces lowercase and title-case variants, deduplicated.
    """
    variants = []
    seen = set()
    for t in terms:
        for v in (t, t.title()):
            if v not in seen:
                seen.add(v)
                variants.append(v)
    return variants
keyword_search function · python · L72-L135 (64 LOC)
src/hybrid_search.py
def keyword_search(
    query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
    """Search the vault for chunks containing query keywords.

    Combines all query terms into a single ChromaDB $or query, then ranks
    results by number of matching terms.

    Args:
        query: Search query string.
        n_results: Maximum number of results to return.
        chunk_type: Filter by chunk type (e.g. "frontmatter", "section").

    Returns:
        List of dicts with 'source', 'content', and 'heading' keys,
        sorted by hit count.
    """
    terms = _extract_query_terms(query)
    if not terms:
        return []

    collection = get_collection()

    # Build filter with case variants (ChromaDB $contains is case-sensitive)
    variants = _case_variants(terms)
    if len(variants) == 1:
        where_document = {"$contains": variants[0]}
    else:
        where_document = {"$or": [{"$contains": v} for v in variants]}

    get_kwargs: dict = {
 
merge_results function · python · L143-L180 (38 LOC)
src/hybrid_search.py
def merge_results(
    semantic: list[dict[str, str]],
    keyword: list[dict[str, str]],
    n_results: int = 5,
    semantic_weight: float = 0.5,
    keyword_weight: float = 0.5,
) -> list[dict[str, str]]:
    """Merge two ranked result lists using Reciprocal Rank Fusion.

    Each result receives a score of weight / (rank + k) from each list
    it appears in. Duplicate results have their scores summed.

    Args:
        semantic: Ranked results from semantic search.
        keyword: Ranked results from keyword search.
        n_results: Maximum number of merged results to return.
        semantic_weight: Weight for semantic search scores.
        keyword_weight: Weight for keyword search scores.

    Returns:
        Merged and deduplicated results sorted by combined RRF score.
    """
    scores: dict[tuple, float] = defaultdict(float)
    result_map: dict[tuple, dict[str, str]] = {}

    for rank, result in enumerate(semantic, start=1):
        key = _dedup_key(result)
        s
hybrid_search function · python · L183-L202 (20 LOC)
src/hybrid_search.py
def hybrid_search(
    query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
    """Run semantic and keyword search, merging results with RRF.

    Fetches extra candidates from each source (2x n_results) to ensure
    good coverage after deduplication and re-ranking.

    Args:
        query: Search query string.
        n_results: Maximum number of final results to return.
        chunk_type: Filter by chunk type (e.g. "frontmatter", "section").

    Returns:
        Merged results from both search strategies.
    """
    candidate_count = n_results * 2
    sem_results = semantic_search(query, n_results=candidate_count, chunk_type=chunk_type)
    kw_results = keyword_search(query, n_results=candidate_count, chunk_type=chunk_type)
    return merge_results(sem_results, kw_results, n_results=n_results)
get_last_run function · python · L34-L39 (6 LOC)
src/index_vault.py
def get_last_run() -> float:
    """Get timestamp of last indexing run, or 0 if never run."""
    last_run_file = get_last_run_file()
    if os.path.exists(last_run_file):
        return os.path.getmtime(last_run_file)
    return 0
Repobility · code-quality intelligence · https://repobility.com
mark_run function · python · L42-L53 (12 LOC)
src/index_vault.py
def mark_run(timestamp: float | None = None) -> None:
    """Mark the given timestamp (or current time) as last run.

    Args:
        timestamp: Unix timestamp to record. Defaults to current time.
    """
    os.makedirs(CHROMA_PATH, exist_ok=True)
    marker = get_last_run_file()
    with open(marker, 'w') as f:
        f.write(datetime.now().isoformat())
    if timestamp is not None:
        os.utime(marker, (timestamp, timestamp))
load_manifest function · python · L66-L88 (23 LOC)
src/index_vault.py
def load_manifest() -> set[str] | None:
    """Load set of previously indexed source paths.

    Returns None if no manifest exists, it cannot be read, or a dirty
    sentinel indicates the previous run did not complete cleanly —
    all of which trigger a full-scan fallback in prune_deleted_files.
    """
    if os.path.exists(get_dirty_flag()):
        logger.warning("Previous indexing run was incomplete; falling back to full scan")
        return None
    path = get_manifest_file()
    if not os.path.exists(path):
        return None
    try:
        with open(path) as f:
            data = json.load(f)
        if not isinstance(data, list) or not all(isinstance(s, str) for s in data):
            logger.warning("indexed_sources manifest has unexpected schema, falling back to full scan")
            return None
        return set(data)
    except (json.JSONDecodeError, OSError) as e:
        logger.warning("Failed to load indexed_sources manifest: %s — falling back to full scan", e)
save_manifest function · python · L91-L103 (13 LOC)
src/index_vault.py
def save_manifest(sources: set[str]) -> bool:
    """Save the current set of indexed source paths to disk.

    Returns True on success, False if the write failed.
    """
    os.makedirs(CHROMA_PATH, exist_ok=True)
    try:
        with open(get_manifest_file(), "w") as f:
            json.dump(sorted(sources), f)
        return True
    except OSError as e:
        logger.warning("Failed to save indexed_sources manifest: %s", e)
        return False
_fixed_chunk_text function · python · L106-L114 (9 LOC)
src/index_vault.py
def _fixed_chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
    """Split text into overlapping chunks by character count (fallback chunker)."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        start = end - overlap
    return chunks
_strip_frontmatter function · python · L117-L127 (11 LOC)
src/index_vault.py
def _strip_frontmatter(text: str) -> str:
    """Remove YAML frontmatter from text, returning the body."""
    if not text.startswith("---"):
        return text
    # Find closing ---
    end = text.find("\n---", 3)
    if end == -1:
        return text
    # Skip past closing --- and the newline after it
    body = text[end + 4:]
    return body
_parse_frontmatter function · python · L130-L141 (12 LOC)
src/index_vault.py
def _parse_frontmatter(text: str) -> dict:
    """Parse YAML frontmatter from markdown text, returning dict or {}."""
    if not text.startswith("---"):
        return {}
    end = text.find("\n---", 3)
    if end == -1:
        return {}
    try:
        return yaml.safe_load(text[4:end]) or {}
    except yaml.YAMLError as e:
        logger.debug("Invalid frontmatter YAML: %s", e)
        return {}
_strip_wikilink_brackets function · python · L144-L150 (7 LOC)
src/index_vault.py
def _strip_wikilink_brackets(text: str) -> str:
    """Strip [[]] from wikilinks. Aliased links keep the display name."""
    return re.sub(
        r"\[\[([^\]|]*?)(?:\|([^\]]*?))?\]\]",
        lambda m: m.group(2) or m.group(1),
        text,
    )
_format_frontmatter_value function · python · L153-L160 (8 LOC)
src/index_vault.py
def _format_frontmatter_value(value) -> str:
    """Convert a frontmatter value to searchable text."""
    if isinstance(value, list):
        return ", ".join(_strip_wikilink_brackets(str(v)) for v in value)
    if isinstance(value, dict):
        parts = [f"{k}: {_format_frontmatter_value(v)}" for k, v in value.items()]
        return "; ".join(parts)
    return _strip_wikilink_brackets(str(value))
Open data scored by Repobility · https://repobility.com
format_frontmatter_for_indexing function · python · L163-L179 (17 LOC)
src/index_vault.py
def format_frontmatter_for_indexing(frontmatter: dict) -> str:
    """Convert frontmatter dict to a searchable text block.

    Each field becomes a 'key: value' line. Wikilink brackets are stripped
    so that names are searchable as plain text. Fields in FRONTMATTER_EXCLUDE
    are omitted.
    """
    lines = []
    for key, value in frontmatter.items():
        if key.lower() in FRONTMATTER_EXCLUDE:
            continue
        if value is None:
            continue
        formatted = _format_frontmatter_value(value)
        if formatted.strip():
            lines.append(f"{key}: {formatted}")
    return "\n".join(lines)
_split_by_headings function · python · L182-L215 (34 LOC)
src/index_vault.py
def _split_by_headings(text: str) -> list[tuple[str, str]]:
    """Split text on markdown headings, respecting code fences.

    Returns list of (heading, content) tuples. Content before the first
    heading gets heading="top-level".
    """
    lines = text.split("\n")
    sections: list[tuple[str, str]] = []
    current_heading = "top-level"
    current_lines: list[str] = []
    in_fence = False

    for line in lines:
        # Track code fence state
        if is_fence_line(line):
            in_fence = not in_fence

        # Check for heading (only outside code fences)
        if not in_fence and re.match(r"^#{1,6} ", line):
            # Save previous section
            content = "\n".join(current_lines)
            if content.strip() or current_heading != "top-level":
                sections.append((current_heading, content))
            current_heading = line.strip()
            current_lines = []
        else:
            current_lines.append(line)

    # Save final sectio
_split_sentences function · python · L218-L248 (31 LOC)
src/index_vault.py
def _split_sentences(text: str) -> list[str]:
    """Split text on sentence boundaries (. ? ! followed by space).

    Suppresses splitting after e.g. and i.e. — the only abbreviations
    that unambiguously never end sentences.
    """
    # Find candidate split positions: sentence-ending punctuation + space
    result = []
    last = 0
    for m in re.finditer(r"[.?!] ", text):
        pos = m.start()  # position of the punctuation mark
        char = text[pos]

        if char == ".":
            before = text[last:pos]

            # e.g. / i.e. — before the final period we see "e.g" or "i.e"
            stripped = before.rstrip()
            if len(stripped) >= 3 and stripped[-3:].lower() in ("e.g", "i.e"):
                continue

        # Valid split point
        split_at = m.end()  # after the space
        result.append(text[last:split_at - 1])  # exclude the trailing space
        last = split_at

    # Remaining text
    if last < len(text):
        result.append(text[las
page 1 / 4next ›