Function bodies 224 total
recalculate_paths function · python · L41-L55 (15 LOC)python/config.py
def recalculate_paths():
"""Recalculate all derived paths from VAULT_PATH."""
global INBOX_DIR, PAPERS_PDF_DIR, PAPERS_MD_DIR, PAPERS_METADATA_DIR
global NOTES_DIR, RESEARCH_DIR, WRITING_DIR, ASSETS_DIR, CONFIG_DIR
base = Path(VAULT_PATH)
INBOX_DIR = base / "00_Inbox"
PAPERS_PDF_DIR = base / "01_Papers" / "pdf"
PAPERS_MD_DIR = base / "01_Papers" / "md"
PAPERS_METADATA_DIR = base / "01_Papers" / "metadata"
NOTES_DIR = base / "02_Notes"
RESEARCH_DIR = base / "03_Research"
WRITING_DIR = base / "04_Writing"
ASSETS_DIR = base / "05_Assets"
CONFIG_DIR = base / ".config"ensure_dirs function · python · L58-L75 (18 LOC)python/config.py
def ensure_dirs():
"""Create vault directories if they don't exist."""
for d in [
INBOX_DIR,
PAPERS_PDF_DIR,
PAPERS_MD_DIR,
PAPERS_METADATA_DIR,
NOTES_DIR,
RESEARCH_DIR,
WRITING_DIR,
ASSETS_DIR,
CONFIG_DIR,
]:
d.mkdir(parents=True, exist_ok=True)
# Create note category subdirs
for cat in NOTE_CATEGORIES:
(NOTES_DIR / cat).mkdir(parents=True, exist_ok=True)_read_matrix function · python · L22-L32 (11 LOC)python/handlers/matrix.py
def _read_matrix():
csv_path = _matrix_csv()
if not csv_path.exists():
return [], []
with open(csv_path, encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
return [], []
headers = list(reader.fieldnames)
rows = [dict(row) for row in reader]
return headers, rows_write_matrix function · python · L35-L41 (7 LOC)python/handlers/matrix.py
def _write_matrix(headers, rows):
csv_path = _matrix_csv()
csv_path.parent.mkdir(parents=True, exist_ok=True)
with open(csv_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, extrasaction="ignore")
writer.writeheader()
writer.writerows(rows)_classify_strength function · python · L44-L50 (7 LOC)python/handlers/matrix.py
def _classify_strength(value: str) -> str:
v = value.strip().lower()
if v in STRONG_VALUES:
return "strong"
if v in MODERATE_VALUES:
return "moderate"
return "weak"MatrixUpdateBody class · python · L53-L55 (3 LOC)python/handlers/matrix.py
class MatrixUpdateBody(BaseModel):
headers: List[str]
rows: List[Dict[str, Any]]get_matrix function · python · L59-L62 (4 LOC)python/handlers/matrix.py
async def get_matrix():
headers, rows = _read_matrix()
claim_columns = headers[2:] if len(headers) > 2 else []
return {"headers": headers, "claim_columns": claim_columns, "rows": rows}Repobility — same analyzer, your code, free for public repos · /scan/
get_matrix_stats function · python · L66-L85 (20 LOC)python/handlers/matrix.py
async def get_matrix_stats():
headers, rows = _read_matrix()
claim_columns = headers[2:] if len(headers) > 2 else []
total_papers = len(rows)
total_claims = len(claim_columns)
if total_papers == 0 or total_claims == 0:
return {"total_papers": total_papers, "total_claims": total_claims, "coverage_pct": 0.0, "strength_distribution": {"strong": 0, "moderate": 0, "weak": 0}}
strength_dist = {"strong": 0, "moderate": 0, "weak": 0}
filled_cells = 0
for row in rows:
for col in claim_columns:
val = row.get(col, "").strip()
if val:
filled_cells += 1
strength_dist[_classify_strength(val)] += 1
total_cells = total_papers * total_claims
coverage_pct = round((filled_cells / total_cells) * 100, 2) if total_cells else 0.0
return {"total_papers": total_papers, "total_claims": total_claims, "coverage_pct": coverage_pct, "strength_distribution": strength_dist}update_matrix function · python · L89-L93 (5 LOC)python/handlers/matrix.py
async def update_matrix(body: MatrixUpdateBody):
if not body.headers:
raise HTTPException(status_code=400, detail="headers must not be empty")
_write_matrix(body.headers, body.rows)
return {"message": "Matrix updated successfully", "rows_written": len(body.rows)}_parse_frontmatter function · python · L15-L26 (12 LOC)python/handlers/notes.py
def _parse_frontmatter(text: str) -> tuple:
frontmatter = {}
body = text
if text.startswith("---"):
parts = text.split("---", 2)
if len(parts) >= 3:
try:
frontmatter = yaml.safe_load(parts[1]) or {}
except yaml.YAMLError:
frontmatter = {}
body = parts[2].lstrip("\n")
return frontmatter, body_safe_category function · python · L29-L32 (4 LOC)python/handlers/notes.py
def _safe_category(category: str) -> str:
if category not in config.NOTE_CATEGORIES:
raise HTTPException(status_code=400, detail=f"Unknown category '{category}'")
return category_note_path function · python · L35-L39 (5 LOC)python/handlers/notes.py
def _note_path(category: str, filename: str) -> Path:
_safe_category(category)
if not filename.endswith(".md"):
filename = filename + ".md"
return config.NOTES_DIR / category / filenamelist_notes function · python · L43-L65 (23 LOC)python/handlers/notes.py
async def list_notes():
notes: List[dict] = []
for category in config.NOTE_CATEGORIES:
cat_dir = config.NOTES_DIR / category
if not cat_dir.exists():
continue
for md_file in sorted(cat_dir.glob("*.md")):
try:
text = md_file.read_text(encoding="utf-8")
except Exception:
continue
fm, _ = _parse_frontmatter(text)
notes.append({
"id": f"{category}/{md_file.name}",
"filename": md_file.name,
"title": fm.get("title") or md_file.stem,
"authors": fm.get("authors") or fm.get("author"),
"doi": fm.get("doi") or fm.get("DOI"),
"category": category,
"tags": fm.get("tags") or [],
"date": fm.get("date") or fm.get("created"),
})
return notesget_note function · python · L69-L73 (5 LOC)python/handlers/notes.py
async def get_note(category: str, filename: str):
note_path = _note_path(category, filename)
if not note_path.exists():
raise HTTPException(status_code=404, detail=f"Note '{filename}' not found in '{category}'")
return note_path.read_text(encoding="utf-8")delete_note function · python · L77-L82 (6 LOC)python/handlers/notes.py
async def delete_note(category: str, filename: str):
note_path = _note_path(category, filename)
if not note_path.exists():
raise HTTPException(status_code=404, detail=f"Note '{filename}' not found in '{category}'")
note_path.unlink()
return {"message": f"Note '{filename}' deleted from '{category}'"}Repobility · MCP-ready · https://repobility.com
_doi_from_filename function · python · L21-L28 (8 LOC)python/handlers/papers.py
def _doi_from_filename(name: str) -> Optional[str]:
m = DOI_PATTERN.search(name)
if m:
return m.group(0).rstrip(".")
m2 = DOI_SAFE_PATTERN.search(name)
if m2:
return f"{m2.group(1)}/{m2.group(2)}".rstrip(".")
return None_doi_from_pdf function · python · L31-L42 (12 LOC)python/handlers/papers.py
def _doi_from_pdf(pdf_path: Path) -> Optional[str]:
try:
doc = fitz.open(str(pdf_path))
for page_num in range(min(3, len(doc))):
text = doc[page_num].get_text()
m = DOI_PATTERN.search(text)
if m:
return m.group(0).rstrip(".,)")
doc.close()
except Exception:
pass
return None_pdf_to_markdown_pymupdf function · python · L49-L80 (32 LOC)python/handlers/papers.py
def _pdf_to_markdown_pymupdf(pdf_path: Path, doi: Optional[str] = None) -> str:
doc = fitz.open(str(pdf_path))
lines = []
title = pdf_path.stem
try:
first_page = doc[0]
blocks = first_page.get_text("dict")["blocks"]
max_size = 0
for block in blocks:
if block.get("type") != 0:
continue
for line in block.get("lines", []):
for span in line.get("spans", []):
if span["size"] > max_size:
max_size = span["size"]
title = span["text"].strip()
except Exception:
pass
lines.append(f"# {title}\n")
if doi:
lines.append(f"**DOI:** {doi}\n")
lines.append("---\n")
for page_num in range(len(doc)):
text = doc[page_num].get_text()
if text.strip():
lines.append(f"\n## Page {page_num + 1}\n")
lines.append(text)
doc.close()
return "\n".join(lines)_parse_grobid_tei function · python · L83-L122 (40 LOC)python/handlers/papers.py
def _parse_grobid_tei(tei_xml: str) -> str:
try:
root = ET.fromstring(tei_xml)
ns = {"tei": "http://www.tei-c.org/ns/1.0"}
md_lines = []
title_el = root.find(".//tei:titleStmt/tei:title", ns)
if title_el is not None and title_el.text:
md_lines.append(f"# {title_el.text.strip()}\n")
authors = []
for author in root.findall(".//tei:fileDesc//tei:author", ns):
forename = author.find(".//tei:forename", ns)
surname = author.find(".//tei:surname", ns)
parts = []
if forename is not None and forename.text:
parts.append(forename.text.strip())
if surname is not None and surname.text:
parts.append(surname.text.strip())
if parts:
authors.append(" ".join(parts))
if authors:
md_lines.append(f"**Authors:** {', '.join(authors)}\n")
abstract_el = root.find(".//tei:abstract", ns)
_process_pdf function · python · L125-L163 (39 LOC)python/handlers/papers.py
async def _process_pdf(pdf_path: Path) -> dict:
doi = _doi_from_filename(pdf_path.stem)
if not doi:
doi = _doi_from_pdf(pdf_path)
safe_name = _doi_to_safe(doi) if doi else pdf_path.stem
dest_pdf = config.PAPERS_PDF_DIR / f"{safe_name}.pdf"
pdf_path.rename(dest_pdf)
md_content = ""
used_grobid = False
try:
async with httpx.AsyncClient(timeout=2.0) as client:
with open(dest_pdf, "rb") as f:
resp = await client.post(
f"{config.GROBID_URL}/api/processFulltextDocument",
files={"input": (dest_pdf.name, f, "application/pdf")},
data={"consolidateHeader": "1"},
)
if resp.status_code == 200:
md_content = _parse_grobid_tei(resp.text)
if md_content.strip():
used_grobid = True
except Exception:
pass
if not md_content.strip():
md_content = _pdf_to_markdown_pymup_load_paper_index function · python · L166-L173 (8 LOC)python/handlers/papers.py
def _load_paper_index() -> dict:
index_path = config.PAPERS_METADATA_DIR / "index.json"
if index_path.exists():
try:
return json.loads(index_path.read_text(encoding="utf-8"))
except Exception:
pass
return {}_save_paper_index function · python · L176-L178 (3 LOC)python/handlers/papers.py
def _save_paper_index(index: dict) -> None:
index_path = config.PAPERS_METADATA_DIR / "index.json"
index_path.write_text(json.dumps(index, ensure_ascii=False, indent=2), encoding="utf-8")_list_papers function · python · L181-L217 (37 LOC)python/handlers/papers.py
def _list_papers() -> List[dict]:
papers = []
paper_ids: set = set()
if config.PAPERS_PDF_DIR.exists():
for pdf in config.PAPERS_PDF_DIR.glob("*.pdf"):
paper_ids.add(pdf.stem)
if config.PAPERS_MD_DIR.exists():
for md in config.PAPERS_MD_DIR.glob("*.md"):
paper_ids.add(md.stem)
if config.PAPERS_METADATA_DIR.exists():
for meta in config.PAPERS_METADATA_DIR.glob("*.json"):
if meta.stem != "index":
paper_ids.add(meta.stem)
index = _load_paper_index()
for pid in sorted(paper_ids):
meta_file = config.PAPERS_METADATA_DIR / f"{pid}.json"
meta = {}
if meta_file.exists():
try:
meta = json.loads(meta_file.read_text(encoding="utf-8"))
except Exception:
pass
idx_entry = index.get(pid, {})
merged = {**idx_entry, **meta}
papers.append({
"id": pid,
"doi": merged.get("doi"Repobility · code-quality intelligence · https://repobility.com
upload_papers function · python · L226-L241 (16 LOC)python/handlers/papers.py
async def upload_papers(files: List[UploadFile] = File(...)):
results = []
for upload in files:
if not upload.filename.lower().endswith(".pdf"):
results.append({"filename": upload.filename, "success": False, "error": "Not a PDF"})
continue
inbox_path = config.INBOX_DIR / upload.filename
content = await upload.read()
inbox_path.write_bytes(content)
try:
result = await _process_pdf(inbox_path)
result["filename"] = upload.filename
results.append(result)
except Exception as e:
results.append({"filename": upload.filename, "success": False, "error": str(e)})
return {"results": results}get_paper_markdown function · python · L245-L249 (5 LOC)python/handlers/papers.py
async def get_paper_markdown(paper_id: str):
md_path = config.PAPERS_MD_DIR / f"{paper_id}.md"
if not md_path.exists():
raise HTTPException(status_code=404, detail=f"Markdown not found for paper '{paper_id}'")
return {"content": md_path.read_text(encoding="utf-8")}get_paper_metadata function · python · L253-L257 (5 LOC)python/handlers/papers.py
async def get_paper_metadata(paper_id: str):
meta_path = config.PAPERS_METADATA_DIR / f"{paper_id}.json"
if not meta_path.exists():
raise HTTPException(status_code=404, detail=f"Metadata not found for paper '{paper_id}'")
return json.loads(meta_path.read_text(encoding="utf-8"))extract_paper_metadata function · python · L261-L303 (43 LOC)python/handlers/papers.py
async def extract_paper_metadata(paper_id: str):
doi_match = re.match(r"(10\.\d{4,})_(.*)", paper_id)
if not doi_match:
raise HTTPException(status_code=400, detail="Cannot reconstruct DOI from paper_id")
doi = f"{doi_match.group(1)}/{doi_match.group(2)}"
async with httpx.AsyncClient(timeout=10.0) as client:
try:
resp = await client.get(
f"https://api.crossref.org/works/{doi}",
headers={"User-Agent": "ChemLit-Desktop/1.0 (mailto:[email protected])"},
)
except httpx.RequestError as exc:
raise HTTPException(status_code=502, detail=f"CrossRef request failed: {exc}")
if resp.status_code == 404:
raise HTTPException(status_code=404, detail=f"DOI '{doi}' not found in CrossRef")
if resp.status_code != 200:
raise HTTPException(status_code=502, detail=f"CrossRef returned HTTP {resp.status_code}")
data = resp.json().get("message", {})
metadata = {
get_indexer function · python · L18-L22 (5 LOC)python/handlers/search.py
def get_indexer() -> VaultIndexer:
global _indexer
if _indexer is None:
_indexer = VaultIndexer()
return _indexersearch function · python · L26-L37 (12 LOC)python/handlers/search.py
async def search(
q: str = Query(..., min_length=1, description="Search query"),
scope: str = Query("all", description="Search scope: all, papers, notes, research"),
limit: int = Query(20, ge=1, le=100, description="Max results"),
):
"""Full-text search across papers, notes, and research documents."""
if scope not in ("all", "papers", "notes", "research"):
raise HTTPException(status_code=400, detail=f"Invalid scope '{scope}'")
indexer = get_indexer()
results = indexer.search(q, scope=scope, limit=limit)
return {"query": q, "scope": scope, "count": len(results), "results": results}rebuild_index function · python · L41-L45 (5 LOC)python/handlers/search.py
async def rebuild_index():
"""Rebuild the full-text search index from vault files."""
indexer = get_indexer()
counts = indexer.rebuild_all()
return {"status": "ok", "indexed": counts}index_stats function · python · L49-L52 (4 LOC)python/handlers/search.py
async def index_stats():
"""Return index statistics."""
indexer = get_indexer()
return indexer.get_stats()Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
index_single_paper function · python · L56-L60 (5 LOC)python/handlers/search.py
async def index_single_paper(paper_id: str):
"""Index or re-index a single paper."""
indexer = get_indexer()
indexer.index_paper(paper_id)
return {"status": "ok", "paper_id": paper_id}FileChangeEvent class · python · L63-L66 (4 LOC)python/handlers/search.py
class FileChangeEvent(BaseModel):
"""A single file change notification from Tauri's fs watcher."""
path: str
event: str # "create" | "modify" | "remove"FileChangeBatch class · python · L69-L71 (3 LOC)python/handlers/search.py
class FileChangeBatch(BaseModel):
"""Batch of file change events."""
changes: List[FileChangeEvent]notify_file_changes function · python · L75-L129 (55 LOC)python/handlers/search.py
async def notify_file_changes(batch: FileChangeBatch):
"""
Watcher-indexer bridge: accept file change events from Tauri
and re-index affected documents.
Tauri's fs watcher detects changes and POSTs them here so the
search index stays up-to-date without polling.
"""
indexer = get_indexer()
indexed = {"papers": 0, "notes": 0, "research": 0, "skipped": 0}
for change in batch.changes:
file_path = Path(change.path)
if not file_path.exists() and change.event != "remove":
indexed["skipped"] += 1
continue
# Determine what type of document changed based on path
try:
rel = file_path.relative_to(config.VAULT_PATH)
except ValueError:
indexed["skipped"] += 1
continue
parts = rel.parts
if len(parts) >= 2 and parts[0] == "01_Papers":
# Paper file changed — extract paper_id from filename
paper_id = file_path.stem
AIProvider class · python · L16-L22 (7 LOC)python/handlers/settings.py
class AIProvider(BaseModel):
name: str
type: Literal["openai_compatible", "anthropic"]
base_url: str
api_key: str
model: str
temperature: float = 0.7AIProviderUpdate class · python · L25-L30 (6 LOC)python/handlers/settings.py
class AIProviderUpdate(BaseModel):
type: Optional[Literal["openai_compatible", "anthropic"]] = None
base_url: Optional[str] = None
api_key: Optional[str] = None
model: Optional[str] = None
temperature: Optional[float] = None_load_providers function · python · L37-L44 (8 LOC)python/handlers/settings.py
def _load_providers() -> Dict[str, dict]:
pf = _providers_file()
if not pf.exists():
return {}
try:
return json.loads(pf.read_text(encoding="utf-8"))
except Exception:
return {}_save_providers function · python · L47-L49 (3 LOC)python/handlers/settings.py
def _save_providers(providers: Dict[str, dict]):
config.CONFIG_DIR.mkdir(parents=True, exist_ok=True)
_providers_file().write_text(json.dumps(providers, ensure_ascii=False, indent=2), encoding="utf-8")Repobility — same analyzer, your code, free for public repos · /scan/
_mask_key function · python · L52-L55 (4 LOC)python/handlers/settings.py
def _mask_key(key: str) -> str:
if len(key) <= 8:
return "***"
return key[:4] + "***" + key[-4:]list_providers function · python · L59-L66 (8 LOC)python/handlers/settings.py
async def list_providers():
providers = _load_providers()
result = []
for name, cfg in providers.items():
display = dict(cfg)
display["api_key"] = _mask_key(cfg.get("api_key", ""))
result.append(display)
return resultadd_provider function · python · L70-L76 (7 LOC)python/handlers/settings.py
async def add_provider(provider: AIProvider):
providers = _load_providers()
if provider.name in providers:
raise HTTPException(status_code=409, detail=f"Provider '{provider.name}' already exists")
providers[provider.name] = provider.model_dump()
_save_providers(providers)
return {"message": f"Provider '{provider.name}' added successfully"}update_provider function · python · L80-L89 (10 LOC)python/handlers/settings.py
async def update_provider(name: str, update: AIProviderUpdate):
providers = _load_providers()
if name not in providers:
raise HTTPException(status_code=404, detail=f"Provider '{name}' not found")
current = providers[name]
patch = {k: v for k, v in update.model_dump().items() if v is not None}
current.update(patch)
providers[name] = current
_save_providers(providers)
return {"message": f"Provider '{name}' updated successfully"}delete_provider function · python · L93-L99 (7 LOC)python/handlers/settings.py
async def delete_provider(name: str):
providers = _load_providers()
if name not in providers:
raise HTTPException(status_code=404, detail=f"Provider '{name}' not found")
del providers[name]
_save_providers(providers)
return {"message": f"Provider '{name}' deleted successfully"}test_provider function · python · L103-L149 (47 LOC)python/handlers/settings.py
async def test_provider(name: str):
providers = _load_providers()
if name not in providers:
raise HTTPException(status_code=404, detail=f"Provider '{name}' not found")
cfg = providers[name]
provider_type = cfg.get("type", "openai_compatible")
base_url = cfg["base_url"].rstrip("/")
api_key = cfg["api_key"]
model = cfg["model"]
temperature = cfg.get("temperature", 0.7)
start = time.time()
try:
async with httpx.AsyncClient(timeout=15.0) as client:
if provider_type == "openai_compatible":
resp = await client.post(
f"{base_url}/chat/completions",
json={"model": model, "messages": [{"role": "user", "content": "Hello"}], "max_tokens": 50, "temperature": temperature},
headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"},
)
latency_ms = round((time.time() - start) * 1000, 1)
_count_notes function · python · L11-L18 (8 LOC)python/handlers/vault.py
def _count_notes() -> int:
count = 0
if not config.NOTES_DIR.exists():
return 0
for md_file in config.NOTES_DIR.rglob("*.md"):
if "template" not in md_file.name.lower():
count += 1
return countget_vault_stats function · python · L22-L42 (21 LOC)python/handlers/vault.py
async def get_vault_stats():
pdf_count = len(list(config.PAPERS_PDF_DIR.glob("*.pdf"))) if config.PAPERS_PDF_DIR.exists() else 0
md_count = len(list(config.PAPERS_MD_DIR.glob("*.md"))) if config.PAPERS_MD_DIR.exists() else 0
metadata_count = sum(1 for f in config.PAPERS_METADATA_DIR.glob("*.json") if f.stem != "index") if config.PAPERS_METADATA_DIR.exists() else 0
note_count = _count_notes()
writing_body = config.WRITING_DIR / "正文"
chapter_count = len(list(writing_body.glob("*.md"))) if writing_body.exists() else 0
research_files_status = {}
for filename in config.RESEARCH_FILES:
research_files_status[filename] = (config.RESEARCH_DIR / filename).exists()
return {
"pdf_count": pdf_count,
"md_count": md_count,
"metadata_count": metadata_count,
"note_count": note_count,
"chapter_count": chapter_count,
"research_files": research_files_status,
"vault_path": str(config.VAULT_PATH),
}Repobility · MCP-ready · https://repobility.com
get_research_file function · python · L46-L52 (7 LOC)python/handlers/vault.py
async def get_research_file(filename: str):
if filename not in config.RESEARCH_FILES:
raise HTTPException(status_code=400, detail=f"Unknown research file '{filename}'")
file_path = config.RESEARCH_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail=f"Research file '{filename}' does not exist")
return file_path.read_text(encoding="utf-8")IngestRequest class · python · L36-L39 (4 LOC)python/handlers/workflows.py
class IngestRequest(BaseModel):
paper_id: str
pdf_path: Optional[str] = None
provider_name: Optional[str] = NoneWriteRequest class · python · L42-L44 (3 LOC)python/handlers/workflows.py
class WriteRequest(BaseModel):
chapter_name: str
provider_name: Optional[str] = Nonepage 1 / 5next ›