Function bodies 154 total
_chunk_sentences function · python · L251-L295 (45 LOC)src/index_vault.py
def _chunk_sentences(
text: str, heading: str, max_chunk_size: int
) -> list[dict]:
"""Accumulate sentences into chunks, falling back to fixed chunks for oversized ones."""
sentences = _split_sentences(text)
if not sentences:
return []
chunks: list[dict] = []
current = ""
for sentence in sentences:
candidate = (current + " " + sentence).strip() if current else sentence
if len(candidate) <= max_chunk_size:
current = candidate
else:
# Flush current buffer
if current:
chunks.append({
"text": current,
"heading": heading,
"chunk_type": "sentence",
})
current = ""
# Check if this single sentence fits
if len(sentence) <= max_chunk_size:
current = sentence
else:
# Sentence too big — fall back to fixed chunking
_chunk_text_block function · python · L298-L345 (48 LOC)src/index_vault.py
def _chunk_text_block(
text: str, heading: str, max_chunk_size: int
) -> list[dict]:
"""Chunk a text block: try whole section, then paragraphs, then sentences."""
if len(text) <= max_chunk_size:
return [{
"text": text,
"heading": heading,
"chunk_type": "section",
}]
# Split on paragraphs (double newlines)
paragraphs = re.split(r"\n\n+", text)
paragraphs = [p.strip() for p in paragraphs if p.strip()]
if len(paragraphs) > 1:
# Try to accumulate paragraphs into chunks
chunks: list[dict] = []
current = ""
for para in paragraphs:
candidate = (current + "\n\n" + para).strip() if current else para
if len(candidate) <= max_chunk_size:
current = candidate
else:
if current:
chunks.append({
"text": current,
"heading": heading,
chunk_markdown function · python · L348-L391 (44 LOC)src/index_vault.py
def chunk_markdown(
text: str, max_chunk_size: int = 1500, frontmatter: dict | None = None,
) -> list[dict]:
"""Chunk markdown text using structure-aware splitting.
Strips frontmatter, splits on headings, then chunks each section
by paragraph and sentence boundaries as needed. Falls back to
fixed character splitting for text with no natural boundaries.
If frontmatter is provided, creates a dedicated frontmatter chunk
prepended to the result list so metadata is searchable.
Returns list of dicts with keys: text, heading, chunk_type.
chunk_type is one of: frontmatter, section, paragraph, sentence, fragment.
"""
if not text or not text.strip():
return []
all_chunks: list[dict] = []
# Create frontmatter chunk if provided
if frontmatter:
fm_text = format_frontmatter_for_indexing(frontmatter)
if fm_text.strip():
all_chunks.append({
"text": fm_text,
"heading": "front_prepare_file_chunks function · python · L395-L424 (30 LOC)src/index_vault.py
def _prepare_file_chunks(
md_file: Path,
) -> tuple[str, list[str], list[str], list[dict]] | None:
"""Read and chunk a file, returning indexing data without touching ChromaDB.
Safe to call from worker threads — pure Python only.
Returns (source, ids, documents, metadatas) or None if the file is empty.
"""
content = md_file.read_text(encoding='utf-8', errors='ignore')
frontmatter = _parse_frontmatter(content)
chunks = chunk_markdown(content, frontmatter=frontmatter)
if not chunks:
return None
source = str(md_file)
ids = []
documents = []
metadatas = []
for i, chunk in enumerate(chunks):
ids.append(hashlib.md5(f"{md_file}_{i}".encode()).hexdigest())
documents.append(f"[{md_file.stem}] {chunk['text']}")
metadatas.append({
"source": source,
"chunk": i,
"heading": chunk["heading"],
"chunk_type": chunk["chunk_type"],
})
return source, ids, doindex_file function · python · L427-L437 (11 LOC)src/index_vault.py
def index_file(md_file: Path) -> None:
"""Index a single markdown file, replacing any existing chunks."""
result = _prepare_file_chunks(md_file)
source = str(md_file)
collection = get_collection()
existing = collection.get(where={"source": source}, include=[])
if existing['ids']:
collection.delete(ids=existing['ids'])
if result is not None:
_, ids, documents, metadatas = result
collection.upsert(ids=ids, documents=documents, metadatas=metadatas)prune_deleted_files function · python · L440-L473 (34 LOC)src/index_vault.py
def prune_deleted_files(valid_sources: set[str], indexed_sources: set[str] | None = None) -> int:
"""Remove entries for files that no longer exist. Returns count of deleted sources.
Uses a manifest-based fast path when indexed_sources is provided,
falling back to a full metadata scan when it is None (first run or --full).
"""
collection = get_collection()
if indexed_sources is not None:
# Fast path: only examine sources known to be indexed
deleted_sources = indexed_sources - valid_sources
for source in deleted_sources:
collection.delete(where={"source": source})
return len(deleted_sources)
# Slow path: full metadata scan (no manifest available)
all_entries = collection.get(include=["metadatas"])
if not all_entries["ids"]:
return 0
ids_to_delete = []
deleted_sources = set()
for doc_id, metadata in zip(all_entries["ids"], all_entries["metadatas"]):
source = metadata.get("sourceindex_vault function · python · L476-L558 (83 LOC)src/index_vault.py
def index_vault(full: bool = False) -> None:
"""Index the vault, updating only changed files unless full=True."""
scan_start = time.time()
last_run = 0 if full else get_last_run()
# Get all valid markdown files
all_files = get_vault_files(VAULT_PATH)
valid_sources = set(str(f) for f in all_files)
# Load manifest for fast pruning (skip on --full to force full scan).
# Must happen before writing the dirty sentinel so the sentinel from a
# prior incomplete run (if any) is still visible here.
indexed_sources = None if full else load_manifest()
# Write dirty sentinel — removed only after successful manifest save
try:
os.makedirs(CHROMA_PATH, exist_ok=True)
with open(get_dirty_flag(), "w"):
pass
except OSError as e:
logger.warning("Failed to write indexing sentinel: %s — disabling manifest trust", e)
indexed_sources = None
try:
os.remove(get_manifest_file())
except OGenerated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_protect_zones function · python · L20-L30 (11 LOC)src/log_chat.py
def _protect_zones(text: str) -> tuple[str, list[str]]:
"""Replace protected zones with placeholders. Returns (text, originals)."""
originals: list[str] = []
def _replace(match: re.Match) -> str:
originals.append(match.group(0))
return f"\x00{len(originals) - 1}\x00"
for pattern in (_FENCED_BLOCK, _INLINE_CODE, _URL, _WIKILINK):
text = pattern.sub(_replace, text)
return text, originalsadd_wikilinks function · python · L40-L67 (28 LOC)src/log_chat.py
def add_wikilinks(text: str, note_names: set[str]) -> str:
"""Replace references to known notes with wikilinks.
Protects fenced code blocks, inline code, URLs, and existing
wikilinks from being modified.
"""
if not note_names:
return text
# Strip protected zones
text, originals = _protect_zones(text)
# Sort by length descending to match longer names first
sorted_names = sorted(note_names, key=len, reverse=True)
for name in sorted_names:
# Skip very short names (likely false positives)
if len(name) < 3:
continue
# Match whole words, not already in wikilinks or backticks
pattern = r'(?<!\[\[)(?<!`)\b' + re.escape(name) + r'\b(?!\]\])(?!`)'
replacement = f'[[{name}]]'
text = re.sub(pattern, replacement, text)
# Restore protected zones
text = _restore_zones(text, originals)
return textensure_daily_note_exists function · python · L76-L89 (14 LOC)src/log_chat.py
def ensure_daily_note_exists(path: Path) -> str:
"""Create daily note if it doesn't exist, return its content."""
if not path.exists():
path.parent.mkdir(parents=True, exist_ok=True)
today = datetime.now().strftime("%Y-%m-%d")
path.write_text(f"# {today}\n\n## Vault Agent Interactions\n\n")
content = path.read_text()
# Add Vault Agent Interactions header if missing
if "## Vault Agent Interactions" not in content:
content += "\n## Vault Agent Interactions\n\n"
return contentformat_entry function · python · L92-L132 (41 LOC)src/log_chat.py
def format_entry(
task_description: str,
query: str,
summary: str,
files: list[str] | None,
full_response: str | None
) -> str:
"""Format a log entry."""
time_now = datetime.now().strftime("%H:%M")
files_str = "\n".join(f"- `{f}`" for f in files) if files else "- None"
if full_response:
note_names = get_vault_note_names()
full_response = add_wikilinks(full_response, note_names)
return f"""### {time_now} - {task_description}
**Query:** {query}
**Response:**
{full_response}
**Files referenced:**
{files_str}
---
"""
else:
return f"""### {time_now} - {task_description}
**Query:** {query}
**Summary:** {summary}
**Files referenced:**
{files_str}
---
"""insert_entry function · python · L135-L150 (16 LOC)src/log_chat.py
def insert_entry(content: str, entry: str) -> str:
"""Insert entry after the Vault Agent Interactions header."""
marker = "## Vault Agent Interactions\n"
pos = content.find(marker)
if pos != -1:
insert_pos = pos + len(marker)
# Skip existing newlines after header
rest_start = insert_pos
while rest_start < len(content) and content[rest_start] == '\n':
rest_start += 1
content = content[:insert_pos] + "\n" + entry + content[rest_start:]
else:
content += entry
return contentlog_chat function · python · L153-L169 (17 LOC)src/log_chat.py
def log_chat(
task_description: str,
query: str,
summary: str,
files: list[str] | None = None,
full_response: str | None = None
) -> str:
"""Log an interaction to today's daily note. Returns the daily note path."""
daily_note_path = get_daily_note_path()
content = ensure_daily_note_exists(daily_note_path)
entry = format_entry(task_description, query, summary, files, full_response)
content = insert_entry(content, entry)
daily_note_path.write_text(content)
print(f"Logged to {daily_note_path}")
return str(daily_note_path)search_results function · python · L11-L39 (29 LOC)src/search_vault.py
def search_results(
query: str,
n_results: int = 5,
mode: str = "hybrid",
chunk_type: str | None = None,
) -> list[dict[str, str]]:
"""Search the vault and return structured results.
Args:
query: Search query string.
n_results: Maximum number of results to return.
mode: Search strategy -- "hybrid" (default), "semantic", or "keyword".
chunk_type: Filter by chunk type (e.g. "frontmatter", "section").
Returns:
List of dicts with 'source' and 'content' keys.
Raises:
ValueError: If mode is not one of the valid options.
"""
if mode not in VALID_MODES:
raise ValueError(f"Invalid search mode '{mode}'. Must be one of: {VALID_MODES}")
if mode == "hybrid":
return hybrid_search(query, n_results, chunk_type=chunk_type)
elif mode == "semantic":
return semantic_search(query, n_results, chunk_type=chunk_type)
else:
return keyword_search(query, n_results, chunk_type=chget_client function · python · L27-L35 (9 LOC)src/services/chroma.py
def get_client() -> chromadb.PersistentClient:
"""Get or create ChromaDB client (lazy singleton, thread-safe)."""
global _client
if _client is None:
with _lock:
if _client is None:
os.makedirs(CHROMA_PATH, exist_ok=True)
_client = chromadb.PersistentClient(path=CHROMA_PATH)
return _clientOpen data scored by Repobility · https://repobility.com
get_collection function · python · L38-L45 (8 LOC)src/services/chroma.py
def get_collection() -> chromadb.Collection:
"""Get or create the vault collection (lazy singleton, thread-safe)."""
global _collection
if _collection is None:
with _lock:
if _collection is None:
_collection = get_client().get_or_create_collection("obsidian_vault")
return _collectionpurge_database function · python · L48-L59 (12 LOC)src/services/chroma.py
def purge_database() -> None:
"""Delete and recreate the ChromaDB database from scratch.
Removes the entire CHROMA_PATH directory and resets singletons so
the next get_client/get_collection call creates a fresh database.
Used by ``index_vault.py --reset`` to recover from corrupt or
cross-platform-incompatible HNSW index files.
"""
reset()
if os.path.exists(CHROMA_PATH):
shutil.rmtree(CHROMA_PATH)
logger.info("Deleted ChromaDB database at %s", CHROMA_PATH)_base_stub function · python · L9-L20 (12 LOC)src/services/compaction.py
def _base_stub(data: dict) -> dict:
"""Extract common fields shared by all stubs."""
stub: dict = {}
if "success" in data:
stub["status"] = "success" if data["success"] else "error"
else:
stub["status"] = "unknown"
if "error" in data:
stub["error"] = data["error"]
if "message" in data:
stub["message"] = data["message"]
return stub_build_generic_stub function · python · L23-L47 (25 LOC)src/services/compaction.py
def _build_generic_stub(data: dict) -> str:
"""Generic stub builder for tools without specific handlers."""
stub = _base_stub(data)
if "path" in data:
stub["path"] = data["path"]
if "results" in data and isinstance(data["results"], list):
stub["result_count"] = len(data["results"])
files = [
r["source"]
for r in data["results"]
if isinstance(r, dict) and "source" in r
]
if files:
stub["files"] = files
if "content" in data:
stub["has_content"] = True
stub["content_length"] = len(data["content"])
if "date" in data:
stub["date"] = data["date"]
return json.dumps(stub)_build_search_vault_stub function · python · L50-L64 (15 LOC)src/services/compaction.py
def _build_search_vault_stub(data: dict) -> str:
"""Compact search_vault: keep source, heading, and content snippet per result."""
stub = _base_stub(data)
if "results" in data and isinstance(data["results"], list):
stub["result_count"] = len(data["results"])
stub["results"] = [
{
"source": r["source"],
"heading": r.get("heading", ""),
"snippet": r.get("content", "")[:COMPACTION_SNIPPET_LENGTH],
}
for r in data["results"]
if isinstance(r, dict) and "source" in r
]
return json.dumps(stub)_build_read_file_stub function · python · L67-L88 (22 LOC)src/services/compaction.py
def _build_read_file_stub(data: dict) -> str:
"""Compact read_file: keep content preview and pagination markers.
Also handles non-text dispatches (audio transcript, image description).
"""
stub = _base_stub(data)
if "content" in data:
content = data["content"]
stub["content_length"] = len(content)
stub["content_preview"] = content[:COMPACTION_CONTENT_PREVIEW_LENGTH]
trunc_marker = "[... truncated at char"
if trunc_marker in content:
idx = content.rfind(trunc_marker)
if idx != -1:
stub["truncation_marker"] = content[idx:]
if "transcript" in data:
stub["transcript_preview"] = data["transcript"][:COMPACTION_CONTENT_PREVIEW_LENGTH]
if "description" in data:
stub["description_preview"] = data["description"][:COMPACTION_CONTENT_PREVIEW_LENGTH]
if "path" in data:
stub["path"] = data["path"]
return json.dumps(stub)_build_list_stub function · python · L91-L99 (9 LOC)src/services/compaction.py
def _build_list_stub(data: dict) -> str:
"""Compact list tools: preserve full results list (already compact) and total."""
stub = _base_stub(data)
if "results" in data and isinstance(data["results"], list):
stub["result_count"] = len(data["results"])
stub["results"] = data["results"]
if "total" in data:
stub["total"] = data["total"]
return json.dumps(stub)_build_web_search_stub function · python · L102-L112 (11 LOC)src/services/compaction.py
def _build_web_search_stub(data: dict) -> str:
"""Compact web_search: keep title and URL, drop snippets."""
stub = _base_stub(data)
if "results" in data and isinstance(data["results"], list):
stub["result_count"] = len(data["results"])
stub["results"] = [
{"title": r.get("title", ""), "url": r.get("url", "")}
for r in data["results"]
if isinstance(r, dict)
]
return json.dumps(stub)About: code-quality intelligence by Repobility · https://repobility.com
build_tool_stub function · python · L127-L142 (16 LOC)src/services/compaction.py
def build_tool_stub(content: str, tool_name: str | None = None) -> str:
"""Build a compact stub from a tool result string.
Dispatches to tool-specific extractors when tool_name is known,
falling back to generic extraction otherwise.
"""
try:
data = json.loads(content)
except (json.JSONDecodeError, TypeError):
summary = content[:200] if len(content) > 200 else content
return json.dumps({"status": "unknown", "summary": summary})
if tool_name and tool_name in _TOOL_STUB_BUILDERS:
return _TOOL_STUB_BUILDERS[tool_name](data)
return _build_generic_stub(data)compact_tool_messages function · python · L145-L166 (22 LOC)src/services/compaction.py
def compact_tool_messages(messages: list[dict]) -> None:
"""Replace tool results with compact stubs in-place."""
# Build tool_call_id -> tool_name mapping from assistant messages
tool_name_map: dict[str, str] = {}
for msg in messages:
if msg.get("role") == "assistant" and msg.get("tool_calls"):
for tc in msg["tool_calls"]:
call_id = tc.get("id")
name = tc.get("function", {}).get("name")
if call_id and name:
tool_name_map[call_id] = name
for i, msg in enumerate(messages):
if msg.get("role") == "tool" and not msg.get("_compacted"):
call_id = msg["tool_call_id"]
tool_name = tool_name_map.get(call_id)
messages[i] = {
"role": "tool",
"tool_call_id": call_id,
"content": build_tool_stub(msg["content"], tool_name),
"_compacted": True,
}ok function · python · L22-L39 (18 LOC)src/services/vault.py
def ok(data: str | dict | list | None = None, **kwargs) -> str:
"""Return a success JSON response.
Args:
data: Primary response data (string message, dict, or list).
**kwargs: Additional fields to include in the response.
Returns:
JSON string with {"success": true, ...}.
"""
response = {"success": True}
if data is not None:
if isinstance(data, str):
response["message"] = data
elif isinstance(data, (dict, list)):
response["data"] = data
response.update(kwargs)
return json.dumps(response)err function · python · L42-L54 (13 LOC)src/services/vault.py
def err(message: str, **kwargs) -> str:
"""Return an error JSON response.
Args:
message: Error description.
**kwargs: Additional fields to include in the response.
Returns:
JSON string with {"success": false, "error": ...}.
"""
response = {"success": False, "error": message}
response.update(kwargs)
return json.dumps(response)resolve_vault_path function · python · L62-L93 (32 LOC)src/services/vault.py
def resolve_vault_path(path: str, base_path: Path | None = None) -> Path:
"""Resolve a path ensuring it stays within an allowed directory.
Args:
path: Relative path (from base root) or absolute path.
base_path: Base directory to resolve against and constrain to.
Defaults to VAULT_PATH.
Returns:
Resolved absolute Path within the base directory.
Raises:
ValueError: If path escapes base directory or is in excluded directory.
"""
base = base_path if base_path is not None else VAULT_PATH
if Path(path).is_absolute():
resolved = Path(path).resolve()
else:
resolved = (base / path).resolve()
# Security: ensure path is within base directory
try:
resolved.relative_to(base.resolve())
except ValueError:
raise ValueError(f"Path must be within vault: {base}")
# Block excluded directories
if any(excluded in resolved.parts for excluded in EXCLUDED_DIRS):
raise Varesolve_file function · python · L96-L119 (24 LOC)src/services/vault.py
def resolve_file(path: str, base_path: Path | None = None) -> tuple[Path | None, str | None]:
"""Resolve and validate a file path within the vault.
Combines path resolution with existence and file-type checks.
Args:
path: Relative path (from base root) or absolute path.
base_path: Base directory to resolve against. Defaults to VAULT_PATH.
Returns:
Tuple of (resolved_path, None) on success, or (None, error_message) on failure.
"""
try:
file_path = resolve_vault_path(path, base_path=base_path)
except ValueError as e:
return None, str(e)
if not file_path.exists():
return None, f"File not found: {path}"
if not file_path.is_file():
return None, f"Not a file: {path}"
return file_path, Noneresolve_dir function · python · L122-L143 (22 LOC)src/services/vault.py
def resolve_dir(path: str, base_path: Path | None = None) -> tuple[Path | None, str | None]:
"""Resolve and validate a directory path within the vault.
Args:
path: Relative path (from base root) or absolute path.
base_path: Base directory to resolve against. Defaults to VAULT_PATH.
Returns:
Tuple of (resolved_path, None) on success, or (None, error_message) on failure.
"""
try:
dir_path = resolve_vault_path(path, base_path=base_path)
except ValueError as e:
return None, str(e)
if not dir_path.exists():
return None, f"Folder not found: {path}"
if not dir_path.is_dir():
return None, f"Not a folder: {path}"
return dir_path, Noneget_relative_path function · python · L146-L155 (10 LOC)src/services/vault.py
def get_relative_path(absolute_path: Path) -> str:
"""Get a path relative to the vault root.
Args:
absolute_path: Absolute path to a file within the vault.
Returns:
String path relative to VAULT_PATH.
"""
return str(absolute_path.relative_to(VAULT_PATH.resolve()))Repobility — same analyzer, your code, free for public repos · /scan/
get_vault_files function · python · L163-L178 (16 LOC)src/services/vault.py
def get_vault_files(vault_path: Path | None = None) -> list[Path]:
"""Get all markdown files in vault, excluding tooling directories.
Args:
vault_path: Optional path to vault. Defaults to VAULT_PATH from config.
Returns:
List of Path objects for all markdown files in the vault.
"""
vault = vault_path if vault_path is not None else VAULT_PATH
files = []
for md_file in vault.rglob("*.md"):
if any(excluded in md_file.parts for excluded in EXCLUDED_DIRS):
continue
files.append(md_file)
return filesget_vault_note_names function · python · L181-L190 (10 LOC)src/services/vault.py
def get_vault_note_names(vault_path: Path | None = None) -> set[str]:
"""Get set of note names (without .md extension) from vault.
Args:
vault_path: Optional path to vault. Defaults to VAULT_PATH from config.
Returns:
Set of note names without the .md extension.
"""
return {f.stem for f in get_vault_files(vault_path)}extract_frontmatter function · python · L198-L217 (20 LOC)src/services/vault.py
def extract_frontmatter(file_path: Path) -> dict:
"""Extract YAML frontmatter from a markdown file.
Returns:
Dictionary of frontmatter fields, or empty dict if none/invalid.
"""
try:
content = file_path.read_text(encoding="utf-8", errors="ignore")
except OSError as e:
logger.debug("Could not read frontmatter from %s: %s", file_path, e)
return {}
match = re.match(r"^---\n(.*?)\n---\n", content, re.DOTALL)
if not match:
return {}
try:
return yaml.safe_load(match.group(1)) or {}
except yaml.YAMLError:
return {}parse_frontmatter_date function · python · L220-L235 (16 LOC)src/services/vault.py
def parse_frontmatter_date(date_value) -> datetime | None:
"""Parse a frontmatter Date field into a datetime object."""
if date_value is None:
return None
date_str = str(date_value).strip()
# Strip wikilink brackets if present: [[2023-08-11]] -> 2023-08-11
if date_str.startswith("[[") and date_str.endswith("]]"):
date_str = date_str[2:-2]
# Try parsing as ISO date
try:
return datetime.strptime(date_str, "%Y-%m-%d")
except ValueError:
return Noneget_file_creation_time function · python · L238-L248 (11 LOC)src/services/vault.py
def get_file_creation_time(file_path: Path) -> datetime | None:
"""Get file creation time, falling back to ctime if birthtime unavailable."""
try:
stat_result = file_path.stat()
# Try birthtime first (available on macOS, some filesystems)
if hasattr(stat_result, "st_birthtime"):
return datetime.fromtimestamp(stat_result.st_birthtime)
# Fall back to ctime (inode change time on Linux)
return datetime.fromtimestamp(stat_result.st_ctime)
except OSError:
return Noneupdate_file_frontmatter function · python · L251-L319 (69 LOC)src/services/vault.py
def update_file_frontmatter(
file_path: Path,
field: str,
value,
remove: bool = False,
append: bool = False,
rename: bool = False,
) -> None:
"""Update frontmatter in a file, preserving body content.
Args:
file_path: Path to the markdown file.
field: Frontmatter field to update.
value: Value to set, or new key name if rename=True.
remove: If True, remove the field instead of setting it.
append: If True, append value to existing list field.
rename: If True, rename field to value (new key name).
Raises:
ValueError: If file has no frontmatter and remove/rename=True.
ValueError: If append=True but field is not a list.
ValueError: If rename=True but field doesn't exist or target already exists.
"""
content = file_path.read_text(encoding="utf-8")
# Parse existing frontmatter and body
match = re.match(r"^---\n(.*?)\n---\n", content, re.DOTALL)
if match:
frdo_update_frontmatter function · python · L322-L371 (50 LOC)src/services/vault.py
def do_update_frontmatter(
path: str,
field: str,
parsed_value,
operation: str,
) -> tuple[bool, str]:
"""Execute a single frontmatter update.
Args:
path: File path (relative or absolute).
field: Frontmatter field to update.
parsed_value: Already-parsed value to set.
operation: "set", "remove", or "append".
Returns:
Tuple of (success, message).
"""
try:
file_path = resolve_vault_path(path)
except ValueError as e:
return False, str(e)
if not file_path.exists():
return False, f"File not found: {path}"
if not file_path.is_file():
return False, f"Not a file: {path}"
try:
update_file_frontmatter(
file_path,
field,
parsed_value,
remove=(operation == "remove"),
append=(operation == "append"),
rename=(operation == "rename"),
)
except ValueError as e:
return False, f"{efind_section function · python · L389-L467 (79 LOC)src/services/vault.py
def find_section(
lines: list[str], heading: str
) -> tuple[int | None, int | None, str | None]:
"""Find a markdown section by heading.
Locates a heading by case-insensitive exact match and determines the section
boundaries. A section extends from the heading line to the next heading of
same or higher level, or end of file. Ignores headings inside code blocks.
Args:
lines: File content split into lines.
heading: Full heading text including # symbols (e.g., "## Meeting Notes").
Returns:
Tuple of (section_start, section_end, None) on success, where section_start
is the line index of the heading and section_end is the line index where
the section ends (exclusive). Returns (None, None, error_message) on failure.
"""
# Parse heading to extract level and text
heading_match = re.match(r"^(#+)\s+(.*)$", heading.strip())
if not heading_match:
return None, None, f"Invalid heading format: {heading}"
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
do_move_file function · python · L475-L521 (47 LOC)src/services/vault.py
def do_move_file(
source: str,
destination: str,
) -> tuple[bool, str]:
"""Execute a single file move.
Args:
source: Source file path.
destination: Destination file path.
Returns:
Tuple of (success, message).
"""
try:
source_path = resolve_vault_path(source)
except ValueError as e:
return False, str(e)
if not source_path.exists():
return False, f"Source file not found: {source}"
if not source_path.is_file():
return False, f"Source is not a file: {source}"
try:
dest_path = resolve_vault_path(destination)
except ValueError as e:
return False, str(e)
# Handle same source and destination
if source_path == dest_path:
rel_path = get_relative_path(source_path)
return True, f"Already at destination: {rel_path}"
if dest_path.exists():
return False, f"Destination already exists: {destination}"
dest_path.parent.mkdir(parents=True, consume_preview function · python · L538-L543 (6 LOC)src/services/vault.py
def consume_preview(key: tuple) -> bool:
"""Check and consume a pending preview. Returns True if one existed."""
if key in _pending_previews:
_pending_previews.discard(key)
return True
return Falseformat_batch_result function · python · L556-L576 (21 LOC)src/services/vault.py
def format_batch_result(
operation_name: str,
results: list[tuple[bool, str]],
) -> str:
"""Format batch operation results into a summary string."""
succeeded = [msg for success, msg in results if success]
failed = [msg for success, msg in results if not success]
parts = [f"Batch {operation_name}: {len(succeeded)} succeeded, {len(failed)} failed"]
if succeeded:
parts.append("\nSucceeded:")
for msg in succeeded:
parts.append(f"- {msg}")
if failed:
parts.append("\nFailed:")
for msg in failed:
parts.append(f"- {msg}")
return "\n".join(parts)_extract_block function · python · L41-L79 (39 LOC)src/tools/files.py
def _extract_block(lines: list[str], block_id: str) -> str | None:
"""Extract a block by its ^blockid suffix and all indented children.
Args:
lines: File content split into lines.
block_id: The block ID to find (without ^ prefix).
Returns:
The anchor line (suffix stripped) plus indented children, or None if not found.
"""
anchor_idx = None
for i, line in enumerate(lines):
m = _BLOCK_ID_RE.search(line)
if m and m.group(1) == block_id:
anchor_idx = i
break
if anchor_idx is None:
return None
# Strip the ^blockid suffix from the anchor line
anchor_line = _BLOCK_ID_RE.sub("", lines[anchor_idx]).rstrip()
# Determine the indentation of the anchor line
anchor_indent = len(anchor_line) - len(anchor_line.lstrip())
# Collect indented children
result_lines = [anchor_line]
for i in range(anchor_idx + 1, len(lines)):
line = lines[i]
if not line.strip()_expand_embeds function · python · L90-L126 (37 LOC)src/tools/files.py
def _expand_embeds(content: str, source_path: Path) -> str:
"""Expand ![[...]] embeds inline in markdown content.
Scans for embed patterns outside code fences, resolves each file,
and replaces the embed syntax with a labeled blockquote.
Args:
content: The raw markdown text.
source_path: Path of the file being read (to detect self-embeds).
Returns:
Content with embeds replaced by expanded blockquotes.
"""
lines = content.split("\n")
result_lines: list[str] = []
in_fence = False
for line in lines:
if is_fence_line(line):
in_fence = not in_fence
result_lines.append(line)
continue
if in_fence:
result_lines.append(line)
continue
# Check for embeds on this line
if "![[" not in line:
result_lines.append(line)
continue
# Protect inline code spans from expansion
new_line = _expand_line_embeds(li_expand_line_embeds function · python · L129-L149 (21 LOC)src/tools/files.py
def _expand_line_embeds(line: str, source_path: Path) -> str:
"""Expand embeds on a single line, protecting inline code spans."""
# Strip inline code spans, expand embeds in remaining segments, restore
code_spans: list[str] = []
def _save_code(m: re.Match) -> str:
code_spans.append(m.group(0))
return f"\x00CODE{len(code_spans) - 1}\x00"
protected = _INLINE_CODE_RE.sub(_save_code, line)
expanded = _EMBED_RE.sub(
lambda m: _resolve_and_format(m.group(1), source_path),
protected,
)
# Restore code spans
for i, span in enumerate(code_spans):
expanded = expanded.replace(f"\x00CODE{i}\x00", span)
return expanded_resolve_and_format function · python · L152-L188 (37 LOC)src/tools/files.py
def _resolve_and_format(reference: str, source_path: Path) -> str:
"""Resolve an embed reference and return formatted blockquote."""
# Strip display alias: ![[note|alias]] or ![[note#heading|alias]]
target = reference.split("|", 1)[0] if "|" in reference else reference
# Parse reference: split on first #
if "#" in target:
filename, fragment = target.split("#", 1)
else:
filename, fragment = target, None
# Determine file extension — use Path.suffix to check the actual filename,
# not the whole path (folders can contain dots, e.g. "2026.02/daily")
if not Path(filename).suffix:
lookup_name = filename + ".md"
else:
lookup_name = filename
ext = Path(lookup_name).suffix.lower()
# Resolve the file
file_path = _resolve_embed_file(lookup_name, ext)
if file_path is None:
return f"> [Embed error: {reference} — File not found]"
# Self-embed check
try:
if file_path.resolve() == source_resolve_embed_file function · python · L191-L198 (8 LOC)src/tools/files.py
def _resolve_embed_file(lookup_name: str, ext: str) -> Path | None:
"""Resolve an embed filename to a Path, with Attachments fallback for binaries."""
file_path, error = resolve_file(lookup_name)
if error and ext in _BINARY_EXTENSIONS:
file_path, error = resolve_file(lookup_name, base_path=config.ATTACHMENTS_DIR)
if error:
return None
return file_pathOpen data scored by Repobility · https://repobility.com
_expand_binary function · python · L201-L243 (43 LOC)src/tools/files.py
def _expand_binary(file_path: Path, reference: str) -> str:
"""Expand a binary embed (audio/image/office) with caching."""
path_str = str(file_path)
try:
mtime = file_path.stat().st_mtime
except OSError:
return f"> [Embed error: {reference} — Cannot stat file]"
cache_key = (path_str, mtime)
if cache_key in _embed_cache:
logger.debug("Cache hit: %s", file_path.name)
expanded = _embed_cache[cache_key]
else:
ext = file_path.suffix.lower()
if ext in AUDIO_EXTENSIONS:
logger.debug("Cache miss: %s — calling audio handler", file_path.name)
raw = handle_audio(file_path)
elif ext in IMAGE_EXTENSIONS:
logger.debug("Cache miss: %s — calling image handler", file_path.name)
raw = handle_image(file_path)
elif ext in OFFICE_EXTENSIONS:
logger.debug("Cache miss: %s — calling office handler", file_path.name)
raw = handle_office(file_path)
_expand_markdown function · python · L246-L276 (31 LOC)src/tools/files.py
def _expand_markdown(file_path: Path, reference: str, fragment: str | None) -> str:
"""Expand a markdown embed (full note, heading section, or block ID)."""
try:
text = file_path.read_text(encoding="utf-8", errors="ignore")
except (OSError, UnicodeDecodeError):
return f"> [Embed error: {reference} — Cannot read file]"
# Strip frontmatter
fm_match = re.match(r"^---\n.*?^---(?:\n|$)", text, re.DOTALL | re.MULTILINE)
body = text[fm_match.end() :] if fm_match else text
if fragment is None:
return _format_embed(reference, body.strip())
if fragment.startswith("^"):
block_id = fragment[1:]
lines = body.split("\n")
extracted = _extract_block(lines, block_id)
if extracted is None:
return f"> [Embed error: {reference} — Block ID not found]"
return _format_embed(reference, extracted)
# Heading section
heading_text = fragment
lines = body.split("\n")
section_start, sectio_find_section_by_text function · python · L279-L323 (45 LOC)src/tools/files.py
def _find_section_by_text(
lines: list[str], heading_text: str,
) -> tuple[int | None, int | None, str | None]:
"""Find a section by heading text (without # prefix).
Searches all heading levels for a case-insensitive match.
"""
target = heading_text.lower().strip()
matches = []
in_fence = False
for i, line in enumerate(lines):
if is_fence_line(line):
in_fence = not in_fence
continue
if in_fence:
continue
m = HEADING_PATTERN.match(line)
if m and m.group(2).strip().lower() == target:
matches.append((i, len(m.group(1)), line))
if not matches:
return None, None, f"Heading not found: {heading_text}"
if len(matches) > 1:
line_nums = ", ".join(str(m[0] + 1) for m in matches)
return None, None, f"Multiple headings match '{heading_text}': lines {line_nums}"
start_idx, level, _ = matches[0]
section_end = len(lines)
in_fence = False