Function bodies 154 total
ChatView.onOpen method · typescript · L35-L67 (33 LOC)plugin/src/ChatView.ts
async onOpen(): Promise<void> {
const container = this.containerEl.children[1];
container.empty();
container.addClass("vault-chat-container");
// Messages area
this.messagesContainer = container.createDiv({ cls: "chat-messages" });
// Input area
const inputContainer = container.createDiv({ cls: "chat-input-container" });
this.inputField = inputContainer.createEl("textarea", {
cls: "chat-input",
attr: { placeholder: "Type a message..." }
});
this.sendButton = inputContainer.createEl("button", {
cls: "chat-send-button",
text: "Send"
});
// Event listeners
this.sendButton.addEventListener("click", () => this.sendMessage());
this.inputField.addEventListener("keydown", (e) => {
if (e.key === "Enter" && !e.shiftKey) {
e.preventDefault();
this.sendMessage();
}
});
// Welcome message
await this.addMessage("assistant", "Hello! I'm your vault assistant. How can I help you today?");
}ChatView.addMessage method · typescript · L73-L90 (18 LOC)plugin/src/ChatView.ts
private async addMessage(role: "user" | "assistant", content: string, sourcePath = ""): Promise<void> {
this.messages.push({ role, content });
const messageEl = this.messagesContainer.createDiv({
cls: `chat-message chat-message-${role}`
});
const contentEl = messageEl.createDiv({ cls: "chat-message-content" });
if (role === "assistant") {
await MarkdownRenderer.render(this.app, content, contentEl, sourcePath, this);
} else {
contentEl.setText(content);
}
// Auto-scroll to bottom
this.messagesContainer.scrollTop = this.messagesContainer.scrollHeight;
}ChatView.formatToolStatus method · typescript · L106-L122 (17 LOC)plugin/src/ChatView.ts
private formatToolStatus(toolName: string): string {
const labels: Record<string, string> = {
search_vault: "Searching vault...",
read_file: "Reading file...",
find_backlinks: "Finding backlinks...",
find_outlinks: "Finding outlinks...",
search_by_folder: "Listing folder...",
list_files_by_frontmatter: "Searching frontmatter...",
web_search: "Searching the web...",
create_file: "Creating file...",
move_file: "Moving file...",
update_frontmatter: "Updating frontmatter...",
log_interaction: "Logging interaction...",
transcribe_audio: "Transcribing audio...",
};
return labels[toolName] ?? `Running ${toolName}...`;
}ChatView.disablePendingConfirmation method · typescript · L124-L130 (7 LOC)plugin/src/ChatView.ts
private disablePendingConfirmation(): void {
if (!this.pendingConfirmationEl) return;
this.pendingConfirmationEl.querySelectorAll("button").forEach(btn => {
(btn as HTMLButtonElement).disabled = true;
});
this.pendingConfirmationEl = null;
}ChatView.addConfirmationPreview method · typescript · L132-L180 (49 LOC)plugin/src/ChatView.ts
private addConfirmationPreview(message: string, files: string[]): void {
const previewEl = this.messagesContainer.createDiv({ cls: "chat-confirmation-preview" });
// Action description
previewEl.createDiv({ cls: "preview-message", text: message });
// File list
const filesEl = previewEl.createDiv({ cls: "preview-files" });
const visibleCount = 10;
const visibleFiles = files.slice(0, visibleCount);
for (const file of visibleFiles) {
filesEl.createDiv({ text: file });
}
if (files.length > visibleCount) {
const expandEl = previewEl.createDiv({
cls: "preview-expand",
text: `and ${files.length - visibleCount} more...`,
});
expandEl.addEventListener("click", () => {
for (const file of files.slice(visibleCount)) {
filesEl.createDiv({ text: file });
}
expandEl.remove();
});
}
// Buttons
const buttonsEl = previewEl.createDiv({ cls: "preview-buttons" });
const confirmBtn = buttonsEl.createEl("button", {
cls: "preview-coChatView.sendMessageText method · typescript · L182-L275 (94 LOC)plugin/src/ChatView.ts
private async sendMessageText(text: string): Promise<void> {
if (this.isLoading) return;
// Disable any pending confirmation buttons
this.disablePendingConfirmation();
// Capture active file once at request time for consistent context
const activeFile = this.getActiveFilePath();
this.isLoading = true;
this.sendButton.disabled = true;
// Add user message
await this.addMessage("user", text);
// Show loading
const { container: loadingEl, textEl: loadingText } = this.showLoading();
try {
const response = await fetch("http://127.0.0.1:8000/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({
message: text,
session_id: this.sessionId,
active_file: activeFile
})
});
if (!response.ok || !response.body) {
throw new Error(`Server returned ${response.status}`);
}
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = "";
ChatView.sendMessage method · typescript · L277-L282 (6 LOC)plugin/src/ChatView.ts
private async sendMessage(): Promise<void> {
const message = this.inputField.value.trim();
if (!message || this.isLoading) return;
this.inputField.value = "";
await this.sendMessageText(message);
}Repobility · code-quality intelligence · https://repobility.com
VaultChatPlugin.onload method · typescript · L5-L25 (21 LOC)plugin/src/main.ts
async onload(): Promise<void> {
// Register the chat view
this.registerView(
VIEW_TYPE_CHAT,
(leaf: WorkspaceLeaf) => new ChatView(leaf)
);
// Add ribbon icon
this.addRibbonIcon("message-circle", "Open Vault Chat", () => {
this.activateView();
});
// Add command
this.addCommand({
id: "open-vault-chat",
name: "Open Vault Chat",
callback: () => {
this.activateView();
}
});
}VaultChatPlugin.activateView method · typescript · L32-L54 (23 LOC)plugin/src/main.ts
private async activateView(): Promise<void> {
const { workspace } = this.app;
// Check if view already exists
let leaf = workspace.getLeavesOfType(VIEW_TYPE_CHAT)[0];
if (!leaf) {
// Create new leaf in right sidebar
const rightLeaf = workspace.getRightLeaf(false);
if (rightLeaf) {
leaf = rightLeaf;
await leaf.setViewState({
type: VIEW_TYPE_CHAT,
active: true
});
}
}
// Reveal the leaf
if (leaf) {
workspace.revealLeaf(leaf);
}
}truncate_tool_result function · python · L39-L55 (17 LOC)src/agent.py
def truncate_tool_result(result: str, result_id: str | None = None) -> str:
"""Truncate tool result if it exceeds the character limit.
When result_id is provided, the truncation marker includes it
so the LLM can call get_continuation to retrieve more.
"""
if len(result) <= MAX_TOOL_RESULT_CHARS:
return result
truncated = result[:MAX_TOOL_RESULT_CHARS]
if result_id:
truncated += (
f"\n\n[truncated — showing {MAX_TOOL_RESULT_CHARS}/{len(result)} chars. "
f'Call get_continuation with id="{result_id}" to read more]'
)
else:
truncated += "\n\n[truncated]"
return truncatedload_system_prompt function · python · L58-L71 (14 LOC)src/agent.py
def load_system_prompt() -> str:
"""Load system prompt from system_prompt.txt, falling back to .example."""
if SYSTEM_PROMPT_FILE.exists():
return SYSTEM_PROMPT_FILE.read_text(encoding="utf-8").strip()
if SYSTEM_PROMPT_EXAMPLE.exists():
logger.warning(
"system_prompt.txt not found — using system_prompt.txt.example. "
"Copy it to system_prompt.txt and customize for your vault."
)
return SYSTEM_PROMPT_EXAMPLE.read_text(encoding="utf-8").strip()
logger.error("No system prompt file found. Using minimal fallback.")
return "You are a helpful assistant with access to an Obsidian vault."load_preferences function · python · L77-L96 (20 LOC)src/agent.py
def load_preferences() -> str | None:
"""Load user preferences from Preferences.md if it exists.
Returns:
Preferences section to append to system prompt, or None if no preferences.
"""
if not PREFERENCES_FILE.exists():
return None
content = PREFERENCES_FILE.read_text(encoding="utf-8").strip()
if not content:
return None
return f"""
## User Preferences
The following are user preferences and corrections. Always follow these:
{content}"""create_llm_client function · python · L99-L104 (6 LOC)src/agent.py
def create_llm_client() -> OpenAI:
"""Create OpenAI client configured for Fireworks API."""
if not FIREWORKS_API_KEY:
print("Error: FIREWORKS_API_KEY not set in .env", file=sys.stderr)
sys.exit(1)
return OpenAI(api_key=FIREWORKS_API_KEY, base_url=FIREWORKS_BASE_URL)_parse_tool_arguments function · python · L107-L147 (41 LOC)src/agent.py
def _parse_tool_arguments(raw: str) -> dict:
"""Parse tool call arguments with fallbacks for common model quirks.
Known issues handled:
- gpt-oss-120b appends ``\\t<|call|>`` control tokens after the JSON
- Some models emit Python-style dicts (single quotes, True/False/None)
- Trailing commas before } or ]
"""
if not raw or not raw.strip():
return {}
# Strip model control tokens like <|call|>, <|end|>, etc.
cleaned = re.sub(r"<\|[^|]+\|>", "", raw).strip()
# Fast path: valid JSON
try:
parsed = json.loads(cleaned)
if isinstance(parsed, dict):
return parsed
except (json.JSONDecodeError, TypeError):
pass
# Fallback: Python literal syntax (single quotes, True/False/None)
try:
parsed = ast.literal_eval(cleaned)
if isinstance(parsed, dict):
return parsed
except (ValueError, SyntaxError):
pass
# Last resort: strip trailing commas before } or ] an_simplify_schema function · python · L150-L185 (36 LOC)src/agent.py
def _simplify_schema(schema: dict) -> dict:
"""Inline $ref references and simplify anyOf nullable patterns.
Pydantic/FastMCP generates $defs + $ref for Pydantic models and
anyOf: [T, {type: null}] for Optional types. Weaker models struggle
with the indirection — inline everything so the schema is flat.
"""
schema = copy.deepcopy(schema)
defs = schema.pop("$defs", {})
def _resolve(node):
if isinstance(node, dict):
# Resolve $ref → inline the referenced definition
if "$ref" in node:
ref_name = node["$ref"].rsplit("/", 1)[-1]
if ref_name in defs:
return _resolve(copy.deepcopy(defs[ref_name]))
return node
# Simplify anyOf[T, null] → T (keep default/title/description)
if "anyOf" in node:
non_null = [o for o in node["anyOf"] if o != {"type": "null"}]
if len(non_null) == 1:
merged = {Open data scored by Repobility · https://repobility.com
mcp_tool_to_openai_function function · python · L188-L197 (10 LOC)src/agent.py
def mcp_tool_to_openai_function(tool) -> dict:
"""Convert MCP Tool to OpenAI function calling format."""
return {
"type": "function",
"function": {
"name": tool.name,
"description": tool.description or "",
"parameters": _simplify_schema(tool.inputSchema),
},
}extract_text_content function · python · L200-L206 (7 LOC)src/agent.py
def extract_text_content(content) -> str:
"""Extract text from MCP content blocks."""
text_parts = []
for block in content:
if hasattr(block, "text"):
text_parts.append(block.text)
return "\n".join(text_parts) if text_parts else str(content)execute_tool_call function · python · L209-L223 (15 LOC)src/agent.py
async def execute_tool_call(
session: ClientSession, tool_name: str, arguments: dict
) -> str:
"""Execute a tool call via MCP and return the result."""
try:
with anyio.fail_after(TOOL_TIMEOUT):
result = await session.call_tool(tool_name, arguments)
if result.isError:
return f"Tool error: {extract_text_content(result.content)}"
return extract_text_content(result.content)
except TimeoutError:
logger.warning("Tool '%s' timed out after %ds", tool_name, TOOL_TIMEOUT)
return f"Tool error: '{tool_name}' timed out after {TOOL_TIMEOUT}s"
except Exception as e:
return f"Failed to execute tool {tool_name}: {e}"ensure_interaction_logged function · python · L226-L260 (35 LOC)src/agent.py
async def ensure_interaction_logged(
session: ClientSession,
messages: list[dict],
turn_start: int,
user_query: str,
response: str,
) -> None:
"""Auto-log interaction if agent didn't call log_interaction during the turn.
Scans messages added during the turn for tool calls. If any tool calls
were made but none named ``log_interaction``, fires a log_interaction
call via MCP so the interaction is recorded in the daily note.
"""
tool_names_called: list[str] = []
for msg in messages[turn_start:]:
if msg.get("role") == "assistant":
for tc in msg.get("tool_calls") or []:
name = tc.get("function", {}).get("name", "")
if name:
tool_names_called.append(name)
if not tool_names_called:
return # Conversation only — no action taken
if "log_interaction" in tool_names_called:
return # Agent already logged
logger.warning("Agent did not call log_intera_handle_get_continuation function · python · L289-L311 (23 LOC)src/agent.py
def _handle_get_continuation(cache: dict[str, str], arguments: dict) -> str:
"""Serve the next chunk of a cached truncated tool result."""
result_id = arguments.get("id", "")
offset = arguments.get("offset", MAX_TOOL_RESULT_CHARS)
full_result = cache.get(result_id)
if full_result is None:
return json.dumps({"error": f"No cached result for id '{result_id}'"})
chunk = full_result[offset : offset + MAX_TOOL_RESULT_CHARS]
if not chunk:
return json.dumps({"error": "Offset beyond end of result"})
end = offset + len(chunk)
remaining = len(full_result) - end
if remaining > 0:
chunk += (
f"\n\n[truncated — showing {offset}-{end}/{len(full_result)} chars. "
f"{remaining} chars remaining. Call get_continuation with "
f'id="{result_id}" offset={end} to read more]'
)
return chunk_process_tool_calls function · python · L317-L430 (114 LOC)src/agent.py
async def _process_tool_calls(
tool_calls,
session: ClientSession,
messages: list[dict],
truncated_results: dict[str, str],
next_result_id: int,
emit: EventCallback | None,
last_tool_call: dict | None = None,
) -> tuple[int, bool, dict | None]:
"""Execute tool calls from an assistant message and append results to messages.
Returns (updated next_result_id, confirmation_required, preview_data).
``last_tool_call`` is a mutable dict tracking the previous call for dedup.
``preview_data`` is non-None when a confirmation preview should be emitted
by the caller after the response event (to ensure correct SSE ordering).
"""
async def _emit(event_type: str, data: dict) -> None:
if emit is not None:
await emit(event_type, data)
confirmation_required = False
preview_data = None
for i, tool_call in enumerate(tool_calls):
tool_name = tool_call.function.name
raw_args = tool_call.function.argumeagent_turn function · python · L433-L547 (115 LOC)src/agent.py
async def agent_turn(
client: OpenAI,
session: ClientSession,
messages: list[dict],
tools: list[dict],
max_iterations: int = 20,
on_event: EventCallback | None = None,
) -> str:
"""Execute one agent turn, handling tool calls until final response."""
turn_prompt_tokens = 0
turn_completion_tokens = 0
llm_calls = 0
last_content = ""
truncated_results: dict[str, str] = {}
next_result_id = 1
# Tool names excluded from the iteration cap count
UNCOUNTED_TOOLS = {"log_interaction", "get_continuation"}
all_tools = tools + [GET_CONTINUATION_TOOL]
force_text_only = False
text_only_retries = 0
MAX_TEXT_ONLY_RETRIES = 3
last_tool_call: dict = {}
pending_preview: dict | None = None
async def _emit(event_type: str, data: dict) -> None:
if on_event is not None:
await on_event(event_type, data)
while True:
if llm_calls >= max_iterations:
logger.warning(
"chat_loop function · python · L551-L638 (88 LOC)src/agent.py
async def chat_loop():
"""Main chat loop - handles user input and agent responses."""
server_params = StdioServerParameters(
command=sys.executable,
args=[str(PROJECT_ROOT / "src" / "mcp_server.py")],
cwd=str(PROJECT_ROOT),
)
async with AsyncExitStack() as stack:
# Set up MCP connection
read_stream, write_stream = await stack.enter_async_context(
stdio_client(server_params)
)
session = await stack.enter_async_context(
ClientSession(read_stream, write_stream)
)
await session.initialize()
# Get available tools
tools_result = await session.list_tools()
tools = [mcp_tool_to_openai_function(t) for t in tools_result.tools]
tool_names = [t["function"]["name"] for t in tools]
print(f"Connected to MCP server. Tools: {', '.join(tool_names)}")
print("Type 'quit' or Ctrl+C to exit.\n")
# Set up LLM client
client = createHi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
get_or_create_session function · python · L50-L70 (21 LOC)src/api_server.py
def get_or_create_session(active_file: str | None, system_prompt: str) -> Session:
"""Get existing session for a file or create a new one.
Uses LRU eviction: accessed sessions move to end, oldest evicted
when MAX_SESSIONS is exceeded.
"""
if active_file in file_sessions:
file_sessions.move_to_end(active_file)
return file_sessions[active_file]
# Evict oldest session if at capacity
while len(file_sessions) >= MAX_SESSIONS:
file_sessions.popitem(last=False)
session = Session(
session_id=str(uuid.uuid4()),
active_file=active_file,
messages=[{"role": "system", "content": system_prompt}],
)
file_sessions[active_file] = session
return sessiontrim_messages function · python · L73-L99 (27 LOC)src/api_server.py
def trim_messages(messages: list[dict]) -> None:
"""Trim messages to MAX_SESSION_MESSAGES, preserving system prompt.
Keeps messages[0] (system prompt) + the most recent messages.
Avoids splitting tool call groups by advancing the trim point
to the next user message.
"""
if len(messages) <= MAX_SESSION_MESSAGES:
return
# How many non-system messages to keep
keep = MAX_SESSION_MESSAGES - 1
trim_index = len(messages) - keep
# Don't trim the system prompt
if trim_index <= 1:
return
# Advance trim point to avoid splitting a tool call group:
# find the first 'user' message at or after trim_index
while trim_index < len(messages) and messages[trim_index].get("role") != "user":
trim_index += 1
if trim_index >= len(messages):
return
del messages[1:trim_index]_build_system_prompt function · python · L124-L130 (7 LOC)src/api_server.py
def _build_system_prompt() -> str:
"""Build system prompt with current user preferences appended."""
system_prompt = app.state.system_prompt
preferences = load_preferences()
if preferences:
system_prompt += preferences
return system_prompt_setup_turn function · python · L133-L145 (13 LOC)src/api_server.py
def _setup_turn(session: Session, request: ChatRequest, system_prompt: str) -> set[int]:
"""Prepare turn messages. Must be called with session.lock held."""
messages = session.messages
messages[0]["content"] = system_prompt
compacted_indices = {i for i, msg in enumerate(messages) if msg.get("_compacted")}
for msg in messages:
msg.pop("_compacted", None)
context_prefix = format_context_prefix(request.active_file)
messages.append({"role": "user", "content": context_prefix + request.message})
return compacted_indiceslifespan function · python · L156-L187 (32 LOC)src/api_server.py
async def lifespan(app: FastAPI):
"""Initialize MCP session and LLM client at startup."""
server_params = StdioServerParameters(
command=sys.executable,
args=[str(PROJECT_ROOT / "src" / "mcp_server.py")],
cwd=str(PROJECT_ROOT),
)
async with AsyncExitStack() as stack:
# Set up MCP connection
read_stream, write_stream = await stack.enter_async_context(
stdio_client(server_params)
)
session = await stack.enter_async_context(
ClientSession(read_stream, write_stream)
)
await session.initialize()
# Get available tools
tools_result = await session.list_tools()
tools = [mcp_tool_to_openai_function(t) for t in tools_result.tools]
# Set up LLM client
client = create_llm_client()
# Store in app state
app.state.mcp_session = session
app.state.llm_client = client
app.state.tools = tools
app.state.system_prchat function · python · L208-L236 (29 LOC)src/api_server.py
async def chat(request: ChatRequest) -> ChatResponse:
"""Process a chat message and return the agent's response."""
system_prompt = _build_system_prompt()
session = get_or_create_session(request.active_file, system_prompt)
async with session.lock:
pre_turn_length = len(session.messages)
compacted_indices = _setup_turn(session, request, system_prompt)
messages = session.messages
turn_start = len(messages) - 1
try:
response = await agent_turn(
app.state.llm_client,
app.state.mcp_session,
messages,
app.state.tools,
)
await ensure_interaction_logged(
app.state.mcp_session, messages, turn_start, request.message, response,
)
_restore_compacted_flags(messages, compacted_indices)
compact_tool_messages(messages)
trim_messages(messages)
return ChatResponse(response=rchat_stream function · python · L240-L297 (58 LOC)src/api_server.py
async def chat_stream(request: ChatRequest):
"""Process a chat message and stream events as SSE."""
system_prompt = _build_system_prompt()
session = get_or_create_session(request.active_file, system_prompt)
queue: asyncio.Queue[dict | None] = asyncio.Queue()
async def on_event(event_type: str, data: dict) -> None:
await queue.put({"type": event_type, **data})
async def run_agent():
try:
async with session.lock:
pre_turn_length = len(session.messages)
compacted_indices = _setup_turn(session, request, system_prompt)
messages = session.messages
turn_start = len(messages) - 1
try:
response = await agent_turn(
app.state.llm_client,
app.state.mcp_session,
messages,
app.state.tools,
on_event=on_event,
)main function · python · L300-L308 (9 LOC)src/api_server.py
def main():
"""Run the API server."""
setup_logging("api")
uvicorn.run(
"api_server:app",
host="127.0.0.1",
port=API_PORT,
reload=False,
)About: code-quality intelligence by Repobility · https://repobility.com
setup_logging function · python · L76-L103 (28 LOC)src/config.py
def setup_logging(name: str) -> None:
"""Configure logging with both stderr and rotating file output.
Args:
name: Log file name without extension (e.g. "api", "agent").
"""
fmt = "%(asctime)s %(name)s %(levelname)s %(message)s"
root = logging.getLogger()
root.setLevel(logging.INFO)
# stderr handler (for journalctl)
stderr_handler = logging.StreamHandler()
stderr_handler.setFormatter(logging.Formatter(fmt))
root.addHandler(stderr_handler)
# Rotating file handler (best-effort — fall back to stderr-only)
try:
LOG_DIR.mkdir(parents=True, exist_ok=True)
file_handler = RotatingFileHandler(
LOG_DIR / f"{name}.log.md",
maxBytes=LOG_MAX_BYTES,
backupCount=LOG_BACKUP_COUNT,
encoding="utf-8",
)
file_handler.setFormatter(logging.Formatter(fmt))
root.addHandler(file_handler)
except OSError as e:
root.warning(f"Could not set up file logging: {semantic_search function · python · L22-L44 (23 LOC)src/hybrid_search.py
def semantic_search(
query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
"""Search the vault using semantic similarity via ChromaDB embeddings.
Args:
query: Natural language search query.
n_results: Maximum number of results to return.
chunk_type: Filter by chunk type (e.g. "frontmatter", "section").
Returns:
List of dicts with 'source' and 'content' keys.
"""
collection = get_collection()
query_kwargs: dict = {"query_texts": [query], "n_results": n_results}
if chunk_type:
query_kwargs["where"] = {"chunk_type": chunk_type}
results = collection.query(**query_kwargs)
return [
{"source": metadata["source"], "content": doc, "heading": metadata.get("heading", "")}
for doc, metadata in zip(results["documents"][0], results["metadatas"][0])
]_extract_query_terms function · python · L47-L54 (8 LOC)src/hybrid_search.py
def _extract_query_terms(query: str) -> list[str]:
"""Split query into meaningful terms, filtering stopwords and short words."""
terms = []
for word in query.split():
cleaned = word.strip(".,!?;:\"'()[]{}").lower()
if len(cleaned) >= 3 and cleaned not in STOPWORDS:
terms.append(cleaned)
return terms_case_variants function · python · L57-L69 (13 LOC)src/hybrid_search.py
def _case_variants(terms: list[str]) -> list[str]:
"""Generate case variants for ChromaDB $contains (which is case-sensitive).
For each term, produces lowercase and title-case variants, deduplicated.
"""
variants = []
seen = set()
for t in terms:
for v in (t, t.title()):
if v not in seen:
seen.add(v)
variants.append(v)
return variantskeyword_search function · python · L72-L135 (64 LOC)src/hybrid_search.py
def keyword_search(
query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
"""Search the vault for chunks containing query keywords.
Combines all query terms into a single ChromaDB $or query, then ranks
results by number of matching terms.
Args:
query: Search query string.
n_results: Maximum number of results to return.
chunk_type: Filter by chunk type (e.g. "frontmatter", "section").
Returns:
List of dicts with 'source', 'content', and 'heading' keys,
sorted by hit count.
"""
terms = _extract_query_terms(query)
if not terms:
return []
collection = get_collection()
# Build filter with case variants (ChromaDB $contains is case-sensitive)
variants = _case_variants(terms)
if len(variants) == 1:
where_document = {"$contains": variants[0]}
else:
where_document = {"$or": [{"$contains": v} for v in variants]}
get_kwargs: dict = {
merge_results function · python · L143-L180 (38 LOC)src/hybrid_search.py
def merge_results(
semantic: list[dict[str, str]],
keyword: list[dict[str, str]],
n_results: int = 5,
semantic_weight: float = 0.5,
keyword_weight: float = 0.5,
) -> list[dict[str, str]]:
"""Merge two ranked result lists using Reciprocal Rank Fusion.
Each result receives a score of weight / (rank + k) from each list
it appears in. Duplicate results have their scores summed.
Args:
semantic: Ranked results from semantic search.
keyword: Ranked results from keyword search.
n_results: Maximum number of merged results to return.
semantic_weight: Weight for semantic search scores.
keyword_weight: Weight for keyword search scores.
Returns:
Merged and deduplicated results sorted by combined RRF score.
"""
scores: dict[tuple, float] = defaultdict(float)
result_map: dict[tuple, dict[str, str]] = {}
for rank, result in enumerate(semantic, start=1):
key = _dedup_key(result)
shybrid_search function · python · L183-L202 (20 LOC)src/hybrid_search.py
def hybrid_search(
query: str, n_results: int = 5, chunk_type: str | None = None
) -> list[dict[str, str]]:
"""Run semantic and keyword search, merging results with RRF.
Fetches extra candidates from each source (2x n_results) to ensure
good coverage after deduplication and re-ranking.
Args:
query: Search query string.
n_results: Maximum number of final results to return.
chunk_type: Filter by chunk type (e.g. "frontmatter", "section").
Returns:
Merged results from both search strategies.
"""
candidate_count = n_results * 2
sem_results = semantic_search(query, n_results=candidate_count, chunk_type=chunk_type)
kw_results = keyword_search(query, n_results=candidate_count, chunk_type=chunk_type)
return merge_results(sem_results, kw_results, n_results=n_results)get_last_run function · python · L34-L39 (6 LOC)src/index_vault.py
def get_last_run() -> float:
"""Get timestamp of last indexing run, or 0 if never run."""
last_run_file = get_last_run_file()
if os.path.exists(last_run_file):
return os.path.getmtime(last_run_file)
return 0Repobility · code-quality intelligence · https://repobility.com
mark_run function · python · L42-L53 (12 LOC)src/index_vault.py
def mark_run(timestamp: float | None = None) -> None:
"""Mark the given timestamp (or current time) as last run.
Args:
timestamp: Unix timestamp to record. Defaults to current time.
"""
os.makedirs(CHROMA_PATH, exist_ok=True)
marker = get_last_run_file()
with open(marker, 'w') as f:
f.write(datetime.now().isoformat())
if timestamp is not None:
os.utime(marker, (timestamp, timestamp))load_manifest function · python · L66-L88 (23 LOC)src/index_vault.py
def load_manifest() -> set[str] | None:
"""Load set of previously indexed source paths.
Returns None if no manifest exists, it cannot be read, or a dirty
sentinel indicates the previous run did not complete cleanly —
all of which trigger a full-scan fallback in prune_deleted_files.
"""
if os.path.exists(get_dirty_flag()):
logger.warning("Previous indexing run was incomplete; falling back to full scan")
return None
path = get_manifest_file()
if not os.path.exists(path):
return None
try:
with open(path) as f:
data = json.load(f)
if not isinstance(data, list) or not all(isinstance(s, str) for s in data):
logger.warning("indexed_sources manifest has unexpected schema, falling back to full scan")
return None
return set(data)
except (json.JSONDecodeError, OSError) as e:
logger.warning("Failed to load indexed_sources manifest: %s — falling back to full scan", e)save_manifest function · python · L91-L103 (13 LOC)src/index_vault.py
def save_manifest(sources: set[str]) -> bool:
"""Save the current set of indexed source paths to disk.
Returns True on success, False if the write failed.
"""
os.makedirs(CHROMA_PATH, exist_ok=True)
try:
with open(get_manifest_file(), "w") as f:
json.dump(sorted(sources), f)
return True
except OSError as e:
logger.warning("Failed to save indexed_sources manifest: %s", e)
return False_fixed_chunk_text function · python · L106-L114 (9 LOC)src/index_vault.py
def _fixed_chunk_text(text: str, chunk_size: int = 500, overlap: int = 50) -> list[str]:
"""Split text into overlapping chunks by character count (fallback chunker)."""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
start = end - overlap
return chunks_strip_frontmatter function · python · L117-L127 (11 LOC)src/index_vault.py
def _strip_frontmatter(text: str) -> str:
"""Remove YAML frontmatter from text, returning the body."""
if not text.startswith("---"):
return text
# Find closing ---
end = text.find("\n---", 3)
if end == -1:
return text
# Skip past closing --- and the newline after it
body = text[end + 4:]
return body_parse_frontmatter function · python · L130-L141 (12 LOC)src/index_vault.py
def _parse_frontmatter(text: str) -> dict:
"""Parse YAML frontmatter from markdown text, returning dict or {}."""
if not text.startswith("---"):
return {}
end = text.find("\n---", 3)
if end == -1:
return {}
try:
return yaml.safe_load(text[4:end]) or {}
except yaml.YAMLError as e:
logger.debug("Invalid frontmatter YAML: %s", e)
return {}_strip_wikilink_brackets function · python · L144-L150 (7 LOC)src/index_vault.py
def _strip_wikilink_brackets(text: str) -> str:
"""Strip [[]] from wikilinks. Aliased links keep the display name."""
return re.sub(
r"\[\[([^\]|]*?)(?:\|([^\]]*?))?\]\]",
lambda m: m.group(2) or m.group(1),
text,
)_format_frontmatter_value function · python · L153-L160 (8 LOC)src/index_vault.py
def _format_frontmatter_value(value) -> str:
"""Convert a frontmatter value to searchable text."""
if isinstance(value, list):
return ", ".join(_strip_wikilink_brackets(str(v)) for v in value)
if isinstance(value, dict):
parts = [f"{k}: {_format_frontmatter_value(v)}" for k, v in value.items()]
return "; ".join(parts)
return _strip_wikilink_brackets(str(value))Open data scored by Repobility · https://repobility.com
format_frontmatter_for_indexing function · python · L163-L179 (17 LOC)src/index_vault.py
def format_frontmatter_for_indexing(frontmatter: dict) -> str:
"""Convert frontmatter dict to a searchable text block.
Each field becomes a 'key: value' line. Wikilink brackets are stripped
so that names are searchable as plain text. Fields in FRONTMATTER_EXCLUDE
are omitted.
"""
lines = []
for key, value in frontmatter.items():
if key.lower() in FRONTMATTER_EXCLUDE:
continue
if value is None:
continue
formatted = _format_frontmatter_value(value)
if formatted.strip():
lines.append(f"{key}: {formatted}")
return "\n".join(lines)_split_by_headings function · python · L182-L215 (34 LOC)src/index_vault.py
def _split_by_headings(text: str) -> list[tuple[str, str]]:
"""Split text on markdown headings, respecting code fences.
Returns list of (heading, content) tuples. Content before the first
heading gets heading="top-level".
"""
lines = text.split("\n")
sections: list[tuple[str, str]] = []
current_heading = "top-level"
current_lines: list[str] = []
in_fence = False
for line in lines:
# Track code fence state
if is_fence_line(line):
in_fence = not in_fence
# Check for heading (only outside code fences)
if not in_fence and re.match(r"^#{1,6} ", line):
# Save previous section
content = "\n".join(current_lines)
if content.strip() or current_heading != "top-level":
sections.append((current_heading, content))
current_heading = line.strip()
current_lines = []
else:
current_lines.append(line)
# Save final sectio_split_sentences function · python · L218-L248 (31 LOC)src/index_vault.py
def _split_sentences(text: str) -> list[str]:
"""Split text on sentence boundaries (. ? ! followed by space).
Suppresses splitting after e.g. and i.e. — the only abbreviations
that unambiguously never end sentences.
"""
# Find candidate split positions: sentence-ending punctuation + space
result = []
last = 0
for m in re.finditer(r"[.?!] ", text):
pos = m.start() # position of the punctuation mark
char = text[pos]
if char == ".":
before = text[last:pos]
# e.g. / i.e. — before the final period we see "e.g" or "i.e"
stripped = before.rstrip()
if len(stripped) >= 3 and stripped[-3:].lower() in ("e.g", "i.e"):
continue
# Valid split point
split_at = m.end() # after the space
result.append(text[last:split_at - 1]) # exclude the trailing space
last = split_at
# Remaining text
if last < len(text):
result.append(text[laspage 1 / 4next ›