← back to glibalien__obsidian-tools

Function bodies 154 total

All specs Real LLM only Function bodies
_format_embed function · python · L326-L332 (7 LOC)
src/tools/files.py
def _format_embed(reference: str, content: str) -> str:
    """Format expanded embed as a labeled blockquote."""
    if not content.strip():
        return f"> [Embedded: {reference}]\n> (empty)"

    quoted_lines = [f"> {line}" if line.strip() else ">" for line in content.split("\n")]
    return f"> [Embedded: {reference}]\n" + "\n".join(quoted_lines)
read_file function · python · L335-L403 (69 LOC)
src/tools/files.py
def read_file(path: str, offset: int = 0, length: int = 30000) -> str:
    """Read content of a vault note with optional pagination.

    Args:
        path: Path to the note, either relative to vault root or absolute.
        offset: Character position to start reading from (default 0).
        length: Maximum characters to return (default 30000).

    Returns:
        The text content of the note, with pagination markers if truncated.
    """
    # Normalize non-breaking spaces that LLMs sometimes generate in paths
    path = path.replace("\xa0", " ")
    file_path, error = resolve_file(path)

    # For binary files (audio/image/office), fall back to Attachments directory
    # when the path doesn't resolve from vault root. Obsidian stores embeds
    # like ![[file.docx]] in the configured attachments folder.
    if error:
        ext = path.rsplit(".", 1)[-1].lower() if "." in path else ""
        if f".{ext}" in _BINARY_EXTENSIONS:
            file_path, att_error = resolve_file(pa
create_file function · python · L406-L457 (52 LOC)
src/tools/files.py
def create_file(
    path: str,
    content: str = "",
    frontmatter: str | None = None,
) -> str:
    """Create a new markdown note in the vault.

    Args:
        path: Path for the new file (relative to vault or absolute).
              Parent directories will be created if they don't exist.
        content: The body content of the note (markdown).
        frontmatter: Optional YAML frontmatter as JSON string, e.g., '{"tags": ["meeting"]}'.
                    Will be converted to YAML and wrapped in --- delimiters.

    Returns:
        Confirmation message or error.
    """
    # Validate path
    try:
        file_path = resolve_vault_path(path)
    except ValueError as e:
        return err(str(e))

    if file_path.exists():
        return err(f"File already exists: {path}")

    # Parse frontmatter if provided
    frontmatter_yaml = ""
    if frontmatter:
        fm_dict, parse_error = _parse_frontmatter(frontmatter)
        if parse_error:
            return err(parse_erro
_parse_frontmatter function · python · L460-L488 (29 LOC)
src/tools/files.py
def _parse_frontmatter(frontmatter: dict | str | None) -> tuple[dict, str | None]:
    """Normalize frontmatter input into a dictionary.

    Accepts None, a native dict, or a JSON object string.
    """
    if frontmatter is None:
        return {}, None

    if isinstance(frontmatter, dict):
        return frontmatter, None

    if not isinstance(frontmatter, str):
        return {}, (
            "Invalid frontmatter type: expected dict, JSON object string, or null. "
            f"Got {type(frontmatter).__name__}."
        )

    try:
        parsed = json.loads(frontmatter)
    except json.JSONDecodeError as e:
        return {}, f"Invalid frontmatter JSON: {e}"

    if not isinstance(parsed, dict):
        return {}, (
            "Invalid frontmatter JSON: expected a JSON object "
            f"(e.g., {{\"tags\": [\"meeting\"]}}), got {type(parsed).__name__}."
        )

    return parsed, None
_split_frontmatter_body function · python · L491-L513 (23 LOC)
src/tools/files.py
def _split_frontmatter_body(content: str) -> tuple[dict, str]:
    """Split a markdown file's content into frontmatter dict and body string.

    Returns:
        Tuple of (frontmatter_dict, body_string). Frontmatter is empty dict
        if the file has no valid YAML frontmatter block.
    """
    match = re.match(r"^---\n(.*?)^---(?:\n|$)", content, re.DOTALL | re.MULTILINE)
    if not match:
        return {}, content

    try:
        fm = yaml.safe_load(match.group(1))
    except yaml.YAMLError:
        return {}, content

    if fm is None:
        fm = {}
    if not isinstance(fm, dict):
        return {}, content

    body = content[match.end():]
    return fm, body
_merge_frontmatter function · python · L516-L546 (31 LOC)
src/tools/files.py
def _merge_frontmatter(source_fm: dict, dest_fm: dict) -> dict:
    """Merge source frontmatter into destination frontmatter.

    Rules:
    - Fields only in source are added to result.
    - Fields only in destination are kept as-is.
    - Both are lists: union (destination order first, then unique source items).
    - Both exist but destination is scalar: destination wins.
    - Identical values: kept as-is.
    """
    merged = {k: list(v) if isinstance(v, list) else v for k, v in dest_fm.items()}
    for key, src_val in source_fm.items():
        if key not in merged:
            merged[key] = src_val
        elif isinstance(merged[key], list) and isinstance(src_val, list):
            # Fall back to linear scan when items are unhashable (e.g. dicts)
            try:
                existing = set()
                for item in merged[key]:
                    existing.add(item if not isinstance(item, list) else tuple(item))
                for item in src_val:
                    
_split_blocks function · python · L552-L584 (33 LOC)
src/tools/files.py
def _split_blocks(body: str) -> list[tuple[str | None, str]]:
    """Split a markdown body into blocks by headings.

    Each block is a (heading, content) tuple where heading is the full heading
    line (e.g. "## Tasks") or None for content before the first heading.
    Content includes the heading line itself and all text until the next heading.

    Returns:
        List of (heading_context, block_content) tuples. Empty list for empty/whitespace body.
    """
    if not body or not body.strip():
        return []

    headings = list(_HEADING_RE.finditer(body))

    if not headings:
        return [(None, body)]

    blocks = []
    first_pos = headings[0].start()
    if first_pos > 0:
        pre_content = body[:first_pos]
        if pre_content.strip():
            blocks.append((None, pre_content))

    for i, match in enumerate(headings):
        pos = match.start()
        end = headings[i + 1].start() if i + 1 < len(headings) else len(body)
        block_text = body[pos:end]
Repobility · code-quality intelligence platform · https://repobility.com
_merge_bodies function · python · L592-L647 (56 LOC)
src/tools/files.py
def _merge_bodies(source_body: str, dest_body: str) -> tuple[str, dict]:
    """Merge unique blocks from source into destination body.

    Blocks from source that already exist in destination (after normalization)
    are skipped. Unique source blocks are placed under matching headings in
    destination if possible, otherwise appended at the end.

    Returns:
        Tuple of (merged_body, stats_dict) where stats_dict has "blocks_added" count.
    """
    source_blocks = _split_blocks(source_body)
    dest_blocks = _split_blocks(dest_body)

    if not source_blocks:
        return dest_body, {"blocks_added": 0}

    dest_normalized = {_normalize_block(content) for _, content in dest_blocks}

    unique_blocks: list[tuple[str | None, str]] = []
    for heading, content in source_blocks:
        if _normalize_block(content) not in dest_normalized:
            unique_blocks.append((heading, content))

    if not unique_blocks:
        return dest_body, {"blocks_added": 0}

    # Build 
merge_files function · python · L650-L697 (48 LOC)
src/tools/files.py
def merge_files(
    source: str,
    destination: str,
    strategy: str = "smart",
    delete_source: bool | None = None,
) -> str:
    """Merge a source file into a destination file.

    Args:
        source: Path to the source ("from") file.
        destination: Path to the destination ("to") file. Must exist.
        strategy: "smart" (content-aware dedup) or "concat" (simple concatenation).
        delete_source: Delete source after merge. Defaults to True for smart, False for concat.

    Returns:
        JSON response describing what happened.
    """
    if strategy not in ("smart", "concat"):
        return err(f"Invalid strategy: {strategy!r}. Must be 'smart' or 'concat'.")

    if delete_source is None:
        delete_source = strategy == "smart"

    source_path, src_err = resolve_file(source)
    if src_err:
        return err(src_err)

    dest_path, dst_err = resolve_file(destination)
    if dst_err:
        return err(dst_err)

    if source_path == dest_path:
       
_merge_concat function · python · L700-L724 (25 LOC)
src/tools/files.py
def _merge_concat(
    source_path, dest_path, src_content, dst_content, delete_source, source_rel,
) -> str:
    """Simple concatenation merge with filename separator."""
    separator = f"\n\n---\n\n*Merged from {source_rel}:*\n\n"
    merged = dst_content.rstrip() + separator + src_content.lstrip()

    try:
        dest_path.write_text(merged, encoding="utf-8")
    except Exception as e:
        return err(f"Error writing merged file: {e}")

    if delete_source:
        try:
            source_path.unlink()
        except OSError as e:
            dest_rel = str(get_relative_path(dest_path))
            return err(f"Merged into {dest_rel} but failed to delete source: {e}")

    dest_rel = str(get_relative_path(dest_path))
    return ok(
        f"Concatenated {source_rel} into {dest_rel}",
        action="concatenated",
        path=dest_rel,
    )
_merge_smart function · python · L727-L778 (52 LOC)
src/tools/files.py
def _merge_smart(
    source_path, dest_path, src_content, dst_content, delete_source, source_rel,
) -> str:
    """Content-aware smart merge with dedup."""
    src_fm, src_body = _split_frontmatter_body(src_content)
    dst_fm, dst_body = _split_frontmatter_body(dst_content)

    bodies_identical = _normalize_block(src_body) == _normalize_block(dst_body)

    merged_fm = _merge_frontmatter(src_fm, dst_fm)
    fm_changed = merged_fm != dst_fm

    if bodies_identical:
        merged_body = dst_body
        blocks_added = 0
    else:
        merged_body, stats = _merge_bodies(src_body, dst_body)
        blocks_added = stats["blocks_added"]

    if not fm_changed and blocks_added == 0:
        action = "identical"
    elif fm_changed and blocks_added == 0:
        action = "frontmatter_merged"
    else:
        action = "content_merged"

    if merged_fm:
        fm_yaml = yaml.dump(merged_fm, default_flow_style=False, allow_unicode=True)
        new_content = f"---\n{fm_yaml}---\n{merge
batch_merge_files function · python · L781-L877 (97 LOC)
src/tools/files.py
def batch_merge_files(
    source_folder: str,
    destination_folder: str,
    recursive: bool = False,
    strategy: str = "smart",
    delete_source: bool | None = None,
    confirm: bool = False,
) -> str:
    """Merge duplicate files between two folders.

    Uses compare_folders to find files with matching names in both folders,
    then merges each source file into the corresponding destination file.
    Files only in source or only in destination are reported but not touched.

    Args:
        source_folder: Folder containing "from" files.
        destination_folder: Folder containing "to" files.
        recursive: Include subfolders. Default False.
        strategy: "smart" (content-aware dedup) or "concat".
        delete_source: Delete source after merge. Defaults to True for smart, False for concat.
        confirm: Must be true to execute when merging more than 5 file pairs.
    """
    from tools.links import compare_folders as _compare_folders

    if strategy not in ("
move_file function · python · L880-L897 (18 LOC)
src/tools/files.py
def move_file(
    source: str,
    destination: str,
) -> str:
    """Move a vault file to a different location within the vault.

    Args:
        source: Current path of the file (relative to vault or absolute).
        destination: New path for the file (relative to vault or absolute).
                    Parent directories will be created if they don't exist.

    Returns:
        Confirmation message or error.
    """
    success, message = do_move_file(source, destination)
    if success:
        return ok(message)
    return err(message)
batch_move_files function · python · L900-L957 (58 LOC)
src/tools/files.py
def batch_move_files(
    moves: list[dict],
    confirm: bool = False,
) -> str:
    """Move multiple vault files to new locations.

    Args:
        moves: List of move operations, each a dict with 'source' and 'destination' keys.
               Example: [{"source": "old/path.md", "destination": "new/path.md"}]
        confirm: Must be true to execute when moving more than 5 files.

    Returns:
        Summary of successes and failures, or confirmation preview for large batches.
    """
    if not moves:
        return err("moves list is empty")

    # Require confirmation for large batches
    if len(moves) > BATCH_CONFIRM_THRESHOLD:
        # Canonical key: stringify each move dict for tuple hashing
        move_keys = tuple(
            (m.get("source", ""), m.get("destination", ""))
            for m in moves if isinstance(m, dict)
        )
        key = ("batch_move_files", move_keys)
        if not (confirm and consume_preview(key)):
            store_preview(key)
          
append_to_file function · python · L960-L981 (22 LOC)
src/tools/files.py
def append_to_file(path: str, content: str) -> str:
    """Append content to the end of an existing vault file.

    Args:
        path: Path to the note (relative to vault or absolute).
        content: Content to append to the file.

    Returns:
        Confirmation message or error.
    """
    file_path, error = resolve_file(path)
    if error:
        return err(error)

    try:
        with file_path.open("a", encoding="utf-8") as f:
            f.write("\n" + content)
    except Exception as e:
        return err(f"Appending to file failed: {e}")

    rel = str(get_relative_path(file_path))
    return ok(f"Appended to {rel}", path=rel)
Source: Repobility analyzer · https://repobility.com
_get_field_ci function · python · L37-L48 (12 LOC)
src/tools/frontmatter.py
def _get_field_ci(frontmatter: dict, field: str):
    """Get a frontmatter value by case-insensitive field name."""
    # Try exact match first (fast path)
    value = frontmatter.get(field)
    if value is not None:
        return value
    # Fall back to case-insensitive scan
    field_lower = field.lower()
    for key, val in frontmatter.items():
        if key.lower() == field_lower:
            return val
    return None
_normalize_frontmatter_value function · python · L63-L91 (29 LOC)
src/tools/frontmatter.py
def _normalize_frontmatter_value(value):
    """Normalize tool input while preserving legacy JSON-string support.

    Native JSON-compatible values are passed through unchanged.
    String values are parsed as JSON only when they look like JSON containers
    or scalar literals (quoted strings, numbers, true/false/null).
    """
    if not isinstance(value, str):
        return value

    candidate = value.strip()
    if not candidate:
        return value

    looks_like_json = (
        (candidate[0] == "{" and candidate[-1] == "}")
        or (candidate[0] == "[" and candidate[-1] == "]")
        or (candidate[0] == '"' and candidate[-1] == '"')
        or candidate in ("true", "false", "null")
        or bool(_JSON_SCALAR_RE.match(candidate))
    )

    if not looks_like_json:
        return value

    try:
        return json.loads(candidate)
    except (json.JSONDecodeError, TypeError):
        return value
_matches_field function · python · L94-L137 (44 LOC)
src/tools/frontmatter.py
def _matches_field(frontmatter: dict, field: str, value: str, match_type: str) -> bool:
    """Check if a frontmatter dict matches a single field condition.

    Both field names and values are compared case-insensitively.
    Wikilink brackets are stripped before comparison so "Foo" matches "[[Foo]]".
    Non-string/non-list values are converted to strings before comparison.

    Match types:
        contains/equals: positive matching (field must exist and match).
        missing: field must be absent (value ignored).
        exists: field must be present (value ignored).
        not_contains/not_equals: field absent OR doesn't match.
    """
    field_value = _get_field_ci(frontmatter, field)

    # Existence/absence checks — value is ignored
    if match_type == "missing":
        return field_value is None
    if match_type == "exists":
        return field_value is not None

    # Field absent: positive matches fail, negative matches succeed
    if field_value is None:
        ret
_validate_filters function · python · L140-L165 (26 LOC)
src/tools/frontmatter.py
def _validate_filters(
    filters: list[FilterCondition] | None,
) -> tuple[list[dict], str | None]:
    """Validate filter conditions and convert to plain dicts.

    Returns:
        (filter_dicts, error_message). error_message is None on success.
    """
    if not filters:
        return [], None

    result = []
    for i, f in enumerate(filters):
        d = f.model_dump() if isinstance(f, FilterCondition) else dict(f)
        if "field" not in d:
            return [], f"filters[{i}] must have a 'field' key"
        mt = d.get("match_type", "contains")
        if mt not in VALID_MATCH_TYPES:
            return [], (
                f"filters[{i}] match_type must be one of {VALID_MATCH_TYPES}, "
                f"got '{mt}'"
            )
        if mt not in NO_VALUE_MATCH_TYPES and not d.get("value"):
            return [], f"filters[{i}] requires 'value' for match_type '{mt}'"
        result.append(d)
    return result, None
_find_matching_files function · python · L168-L225 (58 LOC)
src/tools/frontmatter.py
def _find_matching_files(
    field: str | None,
    value: str,
    match_type: str,
    parsed_filters: list[dict],
    include_fields: list[str] | None = None,
    folder: Path | None = None,
    recursive: bool = False,
) -> list[str | dict]:
    """Scan vault and return files matching all frontmatter conditions.

    Args:
        field: Primary field to match, or None to skip primary matching (folder-only mode).
        value: Primary value to match.
        match_type: Match strategy for primary field.
        parsed_filters: Additional filter conditions (already validated).
        include_fields: If provided, return dicts with path + these field values.
        folder: If provided, restrict scan to files within this directory.
        recursive: If False (default), only direct children. If True, include subfolders.

    Returns:
        Sorted list of path strings or dicts (when include_fields is set).
    """
    matching = []

    files = get_vault_files()
    if folder:
   
list_files_by_frontmatter function · python · L228-L293 (66 LOC)
src/tools/frontmatter.py
def list_files_by_frontmatter(
    field: str,
    value: str = "",
    match_type: str = "contains",
    filters: list[FilterCondition] | None = None,
    include_fields: list[str] | None = None,
    folder: str = "",
    recursive: bool = False,
    limit: int = LIST_DEFAULT_LIMIT,
    offset: int = 0,
) -> str:
    """Find vault files by frontmatter metadata. Use this for structured queries like "find open tasks for project X", "list all notes tagged Y", or "find notes missing field Z".

    Args:
        field: Frontmatter field name to match (e.g., 'tags', 'project', 'category').
        value: Value to match against. Wikilink brackets are stripped automatically.
            Not required for 'missing' or 'exists' match types.
        match_type: How to match the field value:
            'contains' - substring/list member match (default).
            'equals' - exact match.
            'missing' - field is absent (value ignored).
            'exists' - field is present with any val
update_frontmatter function · python · L296-L343 (48 LOC)
src/tools/frontmatter.py
def update_frontmatter(
    path: str,
    field: str,
    value: str | list | None = None,
    operation: str = "set",
) -> str:
    """Update frontmatter on a vault file.

    Args:
        path: Path to the note (relative to vault or absolute).
        field: Frontmatter field name to update.
        value: Value to set. Required for 'set'/'append'.
            For list-type fields (category, tags, aliases, cssclasses),
            ALWAYS pass an array — even for a single value: ["project"].
            For scalar fields (status, project, date), pass a plain string.
            Never pass comma-separated strings like "person, actor" — that
            becomes a single value, not a list.
        operation: 'set' to replace the field value, 'remove' to delete the field,
            'append' to add a single value to a list field (creates the list if
            missing, skips duplicates). Prefer 'append' over 'set' when adding
            to an existing list — it preserves other values
_confirmation_preview function · python · L346-L356 (11 LOC)
src/tools/frontmatter.py
def _confirmation_preview(
    operation: str, field: str, value: str | None, paths: list, context: str,
) -> str:
    """Return a confirmation preview for a batch operation."""
    desc = f"{operation} '{field}'" + (f" = '{value}'" if value else "")
    return ok(
        "Describe this pending change to the user. They will confirm or cancel, then call again with confirm=true.",
        confirmation_required=True,
        preview_message=f"This will {desc} on {len(paths)} files{context}.",
        files=paths,
    )
Powered by Repobility — scan your code at https://repobility.com
_needs_confirmation function · python · L359-L369 (11 LOC)
src/tools/frontmatter.py
def _needs_confirmation(
    field: str, value: str | list | None, operation: str,
    paths: list[str], confirm: bool,
) -> bool:
    """Check confirmation gate. Returns True if preview is needed."""
    hashable_value = tuple(value) if isinstance(value, list) else value
    key = ("batch_update_frontmatter", field, hashable_value, operation, tuple(sorted(paths)))
    if confirm and consume_preview(key):
        return False
    store_preview(key)
    return True
_resolve_batch_targets function · python · L372-L451 (80 LOC)
src/tools/frontmatter.py
def _resolve_batch_targets(
    paths: list[str] | None,
    target_field: str | None,
    target_value: str | None,
    target_match_type: str,
    target_filters: list[FilterCondition] | None,
    folder: str,
    recursive: bool,
    confirm: bool,
    operation: str,
    field: str,
    value: str | None,
) -> tuple[list[str] | None, str | None]:
    """Resolve target file paths for batch operations.

    Returns:
        (resolved_paths, early_return_json). If early_return_json is not None,
        the caller should return it directly (validation error or confirmation preview).
    """
    folder_path = None
    if folder:
        if paths is not None:
            return None, err("Cannot combine 'folder' with explicit 'paths'")
        folder_path, folder_err = resolve_dir(folder)
        if folder_err:
            return None, err(folder_err)

    if target_field is not None and paths is not None:
        return None, err("Provide either paths or target_field/target_value, not b
batch_update_frontmatter function · python · L454-L524 (71 LOC)
src/tools/frontmatter.py
def batch_update_frontmatter(
    field: str,
    value: str | list | None = None,
    operation: str = "set",
    paths: list[str] | None = None,
    target_field: str | None = None,
    target_value: str | None = None,
    target_match_type: str = "contains",
    target_filters: list[FilterCondition] | None = None,
    folder: str = "",
    recursive: bool = False,
    confirm: bool = False,
) -> str:
    """Apply a frontmatter update to multiple vault files.

    Files can be specified in three ways:
    - paths: Explicit list of file paths.
    - target_field/target_value: Query-based targeting using frontmatter criteria.
    - folder: Target all files in a folder (can combine with target_field to narrow).

    Args:
        field: Frontmatter field name to update.
        value: Value to set. Required for 'set'/'append'.
            For list-type fields (category, tags, aliases, cssclasses),
            ALWAYS pass an array — even for a single value: ["project"].
            For s
search_by_date_range function · python · L527-L603 (77 LOC)
src/tools/frontmatter.py
def search_by_date_range(
    start_date: str,
    end_date: str,
    date_type: str = "modified",
    limit: int = LIST_DEFAULT_LIMIT,
    offset: int = 0,
) -> str:
    """Find vault files within a date range.

    Args:
        start_date: Start of date range (inclusive), format: YYYY-MM-DD.
        end_date: End of date range (inclusive), format: YYYY-MM-DD.
        date_type: Which date to check - "created" (frontmatter Date field,
                   falls back to filesystem creation time) or "modified"
                   (filesystem modification time). Default: "modified".

    Returns:
        Newline-separated list of matching file paths (relative to vault),
        or a message if no files found.
    """
    validated_offset, validated_limit, pagination_error = validate_pagination(offset, limit)
    if pagination_error:
        return err(pagination_error)

    if date_type not in ("created", "modified"):
        return err(f"date_type must be 'created' or 'modified', got '{da
find_backlinks function · python · L15-L44 (30 LOC)
src/tools/links.py
def find_backlinks(note_name: str, limit: int = LIST_DEFAULT_LIMIT, offset: int = 0) -> str:
    """Find all vault files that contain wikilinks to a given note.

    Args:
        note_name: The note name to search for (without brackets or .md extension).
        limit: Maximum number of results to return (default 100).
        offset: Number of results to skip (default 0).

    Returns:
        JSON response with list of file paths that link to the note.
    """
    if not note_name or not note_name.strip():
        return err("note_name cannot be empty")

    note_name = note_name.strip()
    if note_name.endswith(".md"):
        note_name = note_name[:-3]

    validated_offset, validated_limit, pagination_error = validate_pagination(offset, limit)
    if pagination_error:
        return err(pagination_error)

    all_results = _scan_backlinks(note_name)

    if not all_results:
        return ok(f"No backlinks found to [[{note_name}]]", results=[], total=0)

    total = len(all_resu
_scan_backlinks function · python · L47-L61 (15 LOC)
src/tools/links.py
def _scan_backlinks(note_name: str) -> list[str]:
    """Fallback: scan all vault files for backlinks (O(n))."""
    pattern = rf"\[\[{re.escape(note_name)}(?:\|[^\]]+)?\]\]"
    backlinks = []

    for md_file in get_vault_files():
        try:
            content = md_file.read_text(encoding="utf-8", errors="ignore")
        except OSError as e:
            logger.debug("Skipping %s during backlink scan: %s", md_file, e)
            continue
        if re.search(pattern, content, re.IGNORECASE):
            backlinks.append(get_relative_path(md_file))

    return sorted(backlinks)
find_outlinks function · python · L64-L107 (44 LOC)
src/tools/links.py
def find_outlinks(path: str, limit: int = LIST_DEFAULT_LIMIT, offset: int = 0) -> str:
    """Extract all wikilinks from a vault file with resolved paths.

    Args:
        path: Path to the note (relative to vault or absolute).
        limit: Maximum number of results to return (default 100).
        offset: Number of results to skip (default 0).

    Returns:
        JSON response with list of {name, path} objects. path is null
        for unresolved links (non-existent notes).
    """
    file_path, error = resolve_file(path)
    if error:
        return err(error)

    validated_offset, validated_limit, pagination_error = validate_pagination(offset, limit)
    if pagination_error:
        return err(pagination_error)

    try:
        content = file_path.read_text(encoding="utf-8", errors="ignore")
    except Exception as e:
        return err(f"Reading file failed: {e}")

    # Pattern captures note name before optional |alias
    pattern = r"\[\[([^\]|]+)(?:\|[^\]]+)?\]\]"
    m
_build_note_path_map function · python · L110-L130 (21 LOC)
src/tools/links.py
def _build_note_path_map() -> tuple[dict[str, str], dict[str, str]]:
    """Build mappings for resolving wikilink names to vault paths.

    Returns:
        Tuple of (stem_map, path_map):
        - stem_map: lowercase stem → relative path (shortest wins for collisions)
        - path_map: lowercase relative path without .md → relative path (all files)
    """
    stem_map: dict[str, str] = {}
    path_map: dict[str, str] = {}
    for md_file in get_vault_files():
        rel_path = get_relative_path(md_file)
        # Normalize separators for cross-platform matching
        normalized = rel_path.replace("\\", "/")
        stem = md_file.stem.lower()
        if stem not in stem_map or len(rel_path) < len(stem_map[stem]):
            stem_map[stem] = rel_path
        # Store without .md for folder-qualified lookup
        key = normalized[:-3].lower() if normalized.endswith(".md") else normalized.lower()
        path_map[key] = rel_path
    return stem_map, path_map
About: code-quality intelligence by Repobility · https://repobility.com
_resolve_link function · python · L133-L150 (18 LOC)
src/tools/links.py
def _resolve_link(
    name: str, stem_map: dict[str, str], path_map: dict[str, str]
) -> str | None:
    """Resolve a wikilink name to a relative vault path.

    Handles #heading suffixes and folder-prefixed links.
    """
    # Strip #heading suffix for resolution
    base_name = name.split("#")[0] if "#" in name else name

    # Try as bare stem (most common case)
    resolved = stem_map.get(base_name.lower())
    if resolved:
        return resolved

    # Try as folder-qualified path (e.g. [[Projects/note1]])
    normalized = base_name.replace("\\", "/").lower()
    return path_map.get(normalized)
search_by_folder function · python · L153-L196 (44 LOC)
src/tools/links.py
def search_by_folder(
    folder: str,
    recursive: bool = False,
    limit: int = LIST_DEFAULT_LIMIT,
    offset: int = 0,
) -> str:
    """List all markdown files in a vault folder.

    Args:
        folder: Path to the folder (relative to vault or absolute).
        recursive: If True, include files in subfolders. Default: False.
        limit: Maximum number of results to return (default 100).
        offset: Number of results to skip (default 0).

    Returns:
        JSON response with list of file paths (relative to vault),
        or a message if no files found.
    """
    validated_offset, validated_limit, pagination_error = validate_pagination(offset, limit)
    if pagination_error:
        return err(pagination_error)

    folder_path, error = resolve_dir(folder)
    if error:
        return err(error)

    # Use rglob for recursive, glob for non-recursive
    pattern_func = folder_path.rglob if recursive else folder_path.glob

    files = []

    for md_file in pattern_
compare_folders function · python · L199-L269 (71 LOC)
src/tools/links.py
def compare_folders(
    source: str,
    target: str,
    recursive: bool = False,
) -> str:
    """Compare two vault folders by markdown filename stem (case-insensitive).

    Args:
        source: Path to the source folder (relative to vault or absolute).
        target: Path to the target folder (relative to vault or absolute).
        recursive: If True, include files in subfolders. Default: False.

    Returns:
        JSON response with only_in_source, only_in_target, in_both lists and counts.
    """
    source_path, source_err = resolve_dir(source)
    if source_err:
        return err(source_err)

    target_path, target_err = resolve_dir(target)
    if target_err:
        return err(target_err)

    if source_path == target_path:
        return err("Source and target folders are the same")

    source_files = _scan_folder(source_path, recursive)
    target_files = _scan_folder(target_path, recursive)

    source_stems: dict[str, list[str]] = defaultdict(list)
    for stem, p
_scan_folder function · python · L272-L282 (11 LOC)
src/tools/links.py
def _scan_folder(folder_path: Path, recursive: bool) -> list[tuple[str, str]]:
    """Scan a folder for .md files and return (lowercased_stem, relative_path) pairs."""
    pattern_func = folder_path.rglob if recursive else folder_path.glob
    results = []
    for md_file in pattern_func("*.md"):
        if any(excluded in md_file.parts for excluded in EXCLUDED_DIRS):
            continue
        rel_path = get_relative_path(md_file)
        stem = md_file.stem.lower()
        results.append((stem, rel_path))
    return results
_read_preferences function · python · L7-L19 (13 LOC)
src/tools/preferences.py
def _read_preferences() -> list[str]:
    """Read preferences from Preferences.md, returning list of preference lines."""
    if not PREFERENCES_FILE.exists():
        return []

    content = PREFERENCES_FILE.read_text(encoding="utf-8")
    lines = []
    for line in content.splitlines():
        stripped = line.strip()
        # Only include lines that are bullet points
        if stripped.startswith("- "):
            lines.append(stripped[2:])  # Remove "- " prefix
    return lines
_write_preferences function · python · L22-L27 (6 LOC)
src/tools/preferences.py
def _write_preferences(preferences: list[str]) -> None:
    """Write preferences list to Preferences.md."""
    content = "\n".join(f"- {pref}" for pref in preferences)
    if content:
        content += "\n"
    PREFERENCES_FILE.write_text(content, encoding="utf-8")
save_preference function · python · L30-L47 (18 LOC)
src/tools/preferences.py
def save_preference(preference: str) -> str:
    """Save a user preference to Preferences.md in the vault root.

    Args:
        preference: The preference text to save (will be added as a bullet point).

    Returns:
        Confirmation message.
    """
    if not preference or not preference.strip():
        return err("preference cannot be empty")

    preference = preference.strip()
    preferences = _read_preferences()
    preferences.append(preference)
    _write_preferences(preferences)

    return ok(f"Saved preference: {preference}")
list_preferences function · python · L50-L61 (12 LOC)
src/tools/preferences.py
def list_preferences() -> str:
    """List all saved user preferences from Preferences.md.

    Returns:
        Numbered list of preferences, or message if none exist.
    """
    preferences = _read_preferences()

    if not preferences:
        return ok("No preferences saved.", results=[])

    return ok(results=[f"{i}. {pref}" for i, pref in enumerate(preferences, start=1)])
Repobility · code-quality intelligence platform · https://repobility.com
remove_preference function · python · L64-L84 (21 LOC)
src/tools/preferences.py
def remove_preference(line_number: int) -> str:
    """Remove a preference by its line number.

    Args:
        line_number: The line number of the preference to remove (1-indexed).

    Returns:
        Confirmation message or error.
    """
    preferences = _read_preferences()

    if not preferences:
        return err("No preferences to remove")

    if line_number < 1 or line_number > len(preferences):
        return err(f"Invalid line number. Must be between 1 and {len(preferences)}")

    removed = preferences.pop(line_number - 1)
    _write_preferences(preferences)

    return ok(f"Removed preference: {removed}")
handle_audio function · python · L25-L53 (29 LOC)
src/tools/readers.py
def handle_audio(file_path: Path) -> str:
    """Transcribe an audio file using Fireworks Whisper API."""
    api_key = os.getenv("FIREWORKS_API_KEY")
    if not api_key:
        return err("FIREWORKS_API_KEY not set")

    try:
        size = file_path.stat().st_size
    except OSError:
        size = 0

    logger.info("Transcribing audio: %s (%d bytes)", file_path.name, size)
    client = OpenAI(api_key=api_key, base_url=FIREWORKS_BASE_URL)
    start = time.perf_counter()
    try:
        with open(file_path, "rb") as f:
            response = client.audio.transcriptions.create(
                model=WHISPER_MODEL,
                file=f,
                response_format="verbose_json",
                timestamp_granularities=["word"],
                extra_body={"diarize": True},
            )
        elapsed = time.perf_counter() - start
        logger.info("Transcribed %s in %.2fs", file_path.name, elapsed)
        return ok(transcript=response.text)
    except Exception as e:
   
handle_image function · python · L56-L96 (41 LOC)
src/tools/readers.py
def handle_image(file_path: Path) -> str:
    """Describe an image using Fireworks vision model."""
    api_key = os.getenv("FIREWORKS_API_KEY")
    if not api_key:
        return err("FIREWORKS_API_KEY not set")

    try:
        size = file_path.stat().st_size
    except OSError:
        size = 0

    logger.info("Describing image: %s (%d bytes)", file_path.name, size)
    client = OpenAI(api_key=api_key, base_url=FIREWORKS_BASE_URL)
    start = time.perf_counter()
    try:
        image_data = file_path.read_bytes()
        b64 = base64.b64encode(image_data).decode("utf-8")

        mime_map = {
            ".png": "image/png", ".jpg": "image/jpeg", ".jpeg": "image/jpeg",
            ".gif": "image/gif", ".webp": "image/webp", ".svg": "image/svg+xml",
        }
        mime = mime_map.get(file_path.suffix.lower(), "image/png")

        response = client.chat.completions.create(
            model=VISION_MODEL,
            messages=[{
                "role": "user",
                "c
handle_office function · python · L99-L123 (25 LOC)
src/tools/readers.py
def handle_office(file_path: Path) -> str:
    """Extract text content from Office documents (.docx, .xlsx, .pptx)."""
    try:
        size = file_path.stat().st_size
    except OSError:
        size = 0

    logger.info("Extracting office document: %s (%d bytes)", file_path.name, size)
    ext = file_path.suffix.lower()
    start = time.perf_counter()
    try:
        if ext == ".docx":
            result = _read_docx(file_path)
        elif ext == ".xlsx":
            result = _read_xlsx(file_path)
        elif ext == ".pptx":
            result = _read_pptx(file_path)
        else:
            return err(f"Unsupported office format: {ext}")
        elapsed = time.perf_counter() - start
        logger.info("Extracted %s in %.2fs", file_path.name, elapsed)
        return result
    except Exception as e:
        logger.warning("Office extraction failed for %s: %s", file_path.name, e)
        return err(f"Failed to read {file_path.name}: {e}")
_read_docx function · python · L126-L170 (45 LOC)
src/tools/readers.py
def _read_docx(file_path: Path) -> str:
    """Extract text from a Word document, preserving headings and tables."""
    from docx import Document as DocxDocument
    from docx.oxml.ns import qn
    from docx.table import Table

    doc = DocxDocument(str(file_path))
    parts = []

    for element in doc.element.body:
        tag = element.tag.split("}")[-1]  # strip namespace
        if tag == "p":
            # Check if it's a heading
            style = element.find(qn("w:pPr"))
            if style is not None:
                pstyle = style.find(qn("w:pStyle"))
                if pstyle is not None:
                    val = pstyle.get(qn("w:val"), "")
                    if val.startswith("Heading"):
                        level = val.replace("Heading", "").strip()
                        try:
                            level = int(level)
                        except ValueError:
                            level = 1
                        text = element.text or ""
         
_read_xlsx function · python · L173-L199 (27 LOC)
src/tools/readers.py
def _read_xlsx(file_path: Path) -> str:
    """Extract Excel data as markdown tables, one per sheet."""
    from openpyxl import load_workbook

    wb = load_workbook(str(file_path), read_only=True, data_only=True)
    parts = []

    for sheet_name in wb.sheetnames:
        ws = wb[sheet_name]
        rows = []
        for row in ws.iter_rows(values_only=True):
            cells = [str(cell) if cell is not None else "" for cell in row]
            if any(c for c in cells):  # skip fully empty rows
                rows.append(cells)

        if not rows:
            continue

        parts.append(f"## {sheet_name}")
        table_rows = ["| " + " | ".join(cells) + " |" for cells in rows]
        header_sep = "| " + " | ".join("---" for _ in rows[0]) + " |"
        table_rows.insert(1, header_sep)
        parts.append("\n".join(table_rows))

    wb.close()
    content = "\n\n".join(parts)
    return ok(content=content)
_read_pptx function · python · L202-L235 (34 LOC)
src/tools/readers.py
def _read_pptx(file_path: Path) -> str:
    """Extract text from PowerPoint slides."""
    from pptx import Presentation

    prs = Presentation(str(file_path))
    parts = []

    for i, slide in enumerate(prs.slides, 1):
        slide_parts = []
        title = None

        # Extract title if present
        if slide.shapes.title and slide.shapes.title.text.strip():
            title = slide.shapes.title.text.strip()

        heading = f"## {title}" if title else f"## Slide {i}"
        slide_parts.append(heading)

        # Extract text from all shapes (excluding title to avoid duplication)
        for shape in slide.shapes:
            if shape == slide.shapes.title:
                continue
            if shape.has_text_frame:
                text = shape.text_frame.text.strip()
                if text:
                    slide_parts.append(text)

        if len(slide_parts) > 1:  # has content beyond heading
            parts.append("\n\n".join(slide_parts))
        elif title:
search_vault function · python · L9-L35 (27 LOC)
src/tools/search.py
def search_vault(
    query: str,
    n_results: int = 5,
    mode: str = "hybrid",
    chunk_type: str = "",
) -> str:
    """Search the Obsidian vault using hybrid search (semantic + keyword).

    Args:
        query: Natural language search query.
        n_results: Number of results to return (default 5).
        mode: Search mode - "hybrid" (default), "semantic", or "keyword".
        chunk_type: Filter by chunk type - "frontmatter", "section", "paragraph",
            "sentence", or "fragment". Empty string means no filter (default).

    Returns:
        JSON response with search results or error.
    """
    try:
        results = search_results(query, n_results, mode, chunk_type=chunk_type or None)
    except Exception as e:
        return err(f"Search failed: {e}. Is the vault indexed? Run: python src/index_vault.py")

    if not results:
        return ok("No matching documents found", results=[])

    return ok(results=results)
Source: Repobility analyzer · https://repobility.com
web_search function · python · L38-L63 (26 LOC)
src/tools/search.py
def web_search(query: str) -> str:
    """Search the web using DuckDuckGo.

    Args:
        query: Search query string.

    Returns:
        JSON response with search results or error.
    """
    if not query or not query.strip():
        return err("query cannot be empty")

    try:
        results = DDGS().text(query, max_results=5)
    except Exception as e:
        return err(f"Search failed: {e}")

    if not results:
        return ok("No web results found", results=[])

    # Format results for readability
    formatted = [
        {"title": r.get("title", "No title"), "url": r.get("href", ""), "snippet": r.get("body", "")}
        for r in results
    ]
    return ok(results=formatted)
prepend_to_file function · python · L14-L56 (43 LOC)
src/tools/sections.py
def prepend_to_file(path: str, content: str) -> str:
    """Prepend content to a vault file, inserting after any frontmatter.

    Args:
        path: Path to the note (relative to vault or absolute).
        content: Content to prepend to the file.

    Returns:
        JSON response: {"success": true, "path": "..."} on success,
        or {"success": false, "error": "..."} on failure.
    """
    file_path, error = resolve_file(path)
    if error:
        return err(error)

    try:
        existing_content = file_path.read_text(encoding="utf-8")
    except Exception as e:
        return err(f"Error reading file: {e}")

    # Detect YAML frontmatter (must start at position 0)
    frontmatter_match = re.match(r"^---\n(.*?)\n---\n", existing_content, re.DOTALL)

    if frontmatter_match:
        # Insert after frontmatter with blank line separator
        frontmatter_end = frontmatter_match.end()
        body = existing_content[frontmatter_end:]
        new_content = (
            exis
replace_section function · python · L59-L101 (43 LOC)
src/tools/sections.py
def replace_section(path: str, heading: str, content: str) -> str:
    """Replace a markdown heading and its content with new content.

    Finds a heading by case-insensitive exact match and replaces the entire
    section (heading + content) through to the next heading of same or higher
    level, or end of file.

    Args:
        path: Path to the note (relative to vault or absolute).
        heading: Full heading text including # symbols (e.g., "## Meeting Notes").
        content: Replacement content (can include a heading or not).

    Returns:
        JSON response: {"success": true, "path": "..."} on success,
        or {"success": false, "error": "..."} on failure.
    """
    file_path, error = resolve_file(path)
    if error:
        return err(error)

    # Read file content
    try:
        file_content = file_path.read_text(encoding="utf-8")
    except Exception as e:
        return err(f"Error reading file: {e}")

    lines = file_content.split("\n")

    # Find section
‹ prevpage 3 / 4next ›