← back to dropdownlogistics__dex-rag

Function bodies 161 total

All specs Real LLM only Function bodies
append_reset_log function · python · L157-L167 (11 LOC)
dex-ingest.py
def append_reset_log(entry: dict) -> None:
    """
    Append one JSON line to .reset_log at the repo root. Forensic-only;
    failures log to stderr but do NOT block the reset (the backup is
    the real recovery path, the log is a paper trail).
    """
    try:
        with open(RESET_LOG, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry) + "\n")
    except Exception as e:
        print(f"    WARN: reset log write failed (non-blocking): {e}", file=sys.stderr)
get_embedding function · python · L173-L184 (12 LOC)
dex-ingest.py
def get_embedding(text: str) -> Optional[List[float]]:
    try:
        r = requests.post(
            OLLAMA_URL,
            json={"model": EMBED_MODEL, "prompt": text},
            timeout=60,
        )
        r.raise_for_status()
        return r.json().get("embedding")
    except Exception as e:
        print(f"    ? Embedding failed: {e}")
        return None
chunk_text function · python · L190-L236 (47 LOC)
dex-ingest.py
def chunk_text(text: str) -> List[str]:
    """
    Chunk a text string into overlapping segments.

    This is intentionally simple and safe for most files.
    Guardrail for extremely large files is handled before calling this.
    """
    char_size = CHUNK_SIZE_TOKENS * CHARS_PER_TOKEN
    char_overlap = CHUNK_OVERLAP_TOKENS * CHARS_PER_TOKEN

    t = (text or "").strip()
    if not t:
        return []
    if len(t) <= char_size:
        return [t]

    out: List[str] = []
    start = 0
    n = len(t)

    while start < n:
        end = min(n, start + char_size)

        # Prefer breaking on paragraph/sentence boundary (best-effort)
        if end < n:
            para = t.rfind("\n\n", start + char_size // 2, end)
            if para > start:
                end = para + 2
            else:
                for sep in [". ", ".\n", "!\n", "?\n", "! ", "? "]:
                    brk = t.rfind(sep, start + char_size // 2, end)
                    if brk > start:
                     
sha256_file function · python · L242-L250 (9 LOC)
dex-ingest.py
def sha256_file(path: str) -> Optional[str]:
    h = hashlib.sha256()
    try:
        with open(path, "rb") as f:
            for b in iter(lambda: f.read(8192), b""):
                h.update(b)
        return h.hexdigest()
    except Exception:
        return None
read_text_file function · python · L253-L261 (9 LOC)
dex-ingest.py
def read_text_file(path: str) -> Optional[str]:
    # Try a few encodings; keep it simple
    for enc in ("utf-8", "utf-8-sig", "cp1252", "latin-1"):
        try:
            with open(path, "r", encoding=enc) as f:
                return f.read()
        except Exception:
            continue
    return None
scan_archive function · python · L264-L287 (24 LOC)
dex-ingest.py
def scan_archive(root: str, extensions: Optional[set] = None) -> List[dict]:
    if extensions is None:
        extensions = PHASE1_EXTENSIONS
    files = []
    for r, dirs, names in os.walk(root):
        dirs[:] = [d for d in dirs if not d.startswith(".")]
        for n in names:
            if n in SKIP_FILENAMES:
                continue
            ext = os.path.splitext(n)[1].lower()
            if ext not in extensions:
                continue
            full = os.path.join(r, n)
            rel = os.path.relpath(full, root)
            files.append(
                {
                    "path": full,
                    "rel_path": rel,
                    "filename": n,
                    "extension": ext,
                    "folder": os.path.basename(r),
                }
            )
    return files
ingest function · python · L293-L627 (335 LOC)
dex-ingest.py
def ingest(archive_path: str, reset: bool = False, build_canon: bool = False, fast: bool = False,
           collection: Optional[str] = None, ext_filter: Optional[set] = None,
           nominated_by: Optional[str] = None, skip_backup_check: bool = False) -> None:
    import chromadb

    ext_list = ", ".join(sorted(ext_filter if ext_filter else PHASE1_EXTENSIONS))

    print("\n" + "=" * 60)
    print("  DEX JR. RAG PIPELINE - PHASE 1 INGESTION")
    print(f"  Archive: {archive_path}")
    if collection:
        print(f"  Mode: SCOPED COLLECTION -> {collection}")
    else:
        print(f"  Mode: {'BUILD CANON' if build_canon else 'NORMAL'}")
    print(f"  Extensions: {ext_list}")
    if nominated_by:
        print(f"  Nominated by: {nominated_by}")
    print("=" * 60)

    # Generate ingest run id (HHMMSS precision to avoid same-minute collisions)
    ingest_run_id = f"manual_{datetime.now(timezone.utc).strftime('%Y-%m-%d_%H%M%S')}"
    print(f"  Ingest run id: {ingest_run_id}")

  
Open data scored by Repobility · https://repobility.com
main function · python · L630-L685 (56 LOC)
dex-ingest.py
def main() -> None:
    p = argparse.ArgumentParser(description="DEX JR RAG - Ingest")
    p.add_argument("--path", type=str, required=True, help="Archive path")
    p.add_argument("--reset", action="store_true", help="Reset collections")
    p.add_argument(
        "--build-canon",
        action="store_true",
        help="Backfill CANON only; do not expand RAW",
    )
    p.add_argument(
        "--collection",
        type=str,
        default=None,
        help="Route ingest to a named scoped collection (e.g. dex_code, ext_reference). Bypasses RAW/CANON routing.",
    )
    p.add_argument(
        "--ext-filter",
        nargs="+",
        default=None,
        help="Limit ingest to specific extensions (e.g. --ext-filter .py .cs .ts). Dot prefix required.",
    )
    p.add_argument(
        "--nominated-by",
        type=str,
        default=None,
        help="Tag chunks with nominating seat(s) for ext_ collections (e.g. '1002,1004').",
    )
    p.add_argument(
        "--fast",
chunk_text function · python · L40-L53 (14 LOC)
dex-ingest-text.py
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE_CHARS,
               overlap: int = CHUNK_OVERLAP_CHARS) -> list[str]:
    """Simple character-based chunker with overlap. Returns list of strings."""
    if len(text) <= chunk_size:
        return [text]
    chunks = []
    start = 0
    while start < len(text):
        end = start + chunk_size
        chunks.append(text[start:end])
        if end >= len(text):
            break
        start = end - overlap
    return chunks
get_embedding function · python · L56-L66 (11 LOC)
dex-ingest-text.py
def get_embedding(text: str) -> list[float]:
    """Generate an embedding via Ollama. Single source of truth for embedding calls."""
    # Matches dex-ingest.py:45-46,105-113 and dex_weights.py:23,62-69
    import requests
    response = requests.post(
        "http://localhost:11434/api/embeddings",
        json={"model": EMBEDDING_MODEL, "prompt": text},
        timeout=60,
    )
    response.raise_for_status()
    return response.json()["embedding"]
infer_source_type function · python · L69-L82 (14 LOC)
dex-ingest-text.py
def infer_source_type(filename: str) -> str:
    """Infer source_type from filename per STD-DDL-METADATA-001 §"Inference rules"."""
    name = filename
    if name.startswith("DDLCouncilReview_"):
        return "council_review"
    if name.startswith("SYNTH-") or "_SYNTH." in name:
        return "council_synthesis"
    if any(name.startswith(p) for p in ("ADR-", "STD-", "PRO-", "CR-")):
        return "governance"
    if name.startswith("sweep_") and name.endswith(".md"):
        return "system_telemetry"
    if name.startswith("audit_") and name.endswith(".md"):
        return "system_telemetry"
    return "unknown"
ingest_text_file function · python · L85-L201 (117 LOC)
dex-ingest-text.py
def ingest_text_file(
    source_path: Path,
    collection_name: str,
    dry_run: bool = False,
) -> dict:
    """
    Ingest a single plain-text file end-to-end through the pipeline.

    Returns a summary dict with keys: file, chunks_written, source_type,
    collection, verified, moved_to, dry_run.
    """
    # Step 1: Validate input
    if not source_path.exists():
        raise FileNotFoundError(f"source file does not exist: {source_path}")
    if source_path.suffix.lower() not in (".txt", ".md"):
        raise ValueError(
            f"dex-ingest-text only handles .txt and .md, got: {source_path.suffix}"
        )

    # Step 2: Read file
    text = source_path.read_text(encoding="utf-8", errors="replace")
    if not text.strip():
        raise ValueError(f"source file is empty: {source_path}")

    # Step 3: Chunk
    chunks = chunk_text(text)
    chunk_total = len(chunks)
    print(f"  Chunked into {chunk_total} chunks")

    # Step 3.5: Backup pre-flight (Trigger 3 of STD-D
main function · python · L204-L227 (24 LOC)
dex-ingest-text.py
def main():
    parser = argparse.ArgumentParser(description="Plain-text ingest for dex-rag pipeline")
    parser.add_argument("source", help="Path to .txt or .md file to ingest")
    parser.add_argument("--collection", default="dex_test",
                        help="Target ChromaDB collection (default: dex_test)")
    parser.add_argument("--dry-run", action="store_true",
                        help="Validate and chunk only — no embeddings, no writes, no move")
    args = parser.parse_args()

    source = Path(args.source).resolve()
    print(f"Ingesting: {source}")
    print(f"Target collection: {args.collection}")
    print(f"Dry run: {args.dry_run}")
    print()

    summary = ingest_text_file(source, args.collection, dry_run=args.dry_run)

    print()
    print("─" * 60)
    print("SUMMARY")
    print("─" * 60)
    for k, v in summary.items():
        print(f"  {k:20} {v}")
    print()
log function · python · L41-L45 (5 LOC)
dex-ocr.py
def log(message: str):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    entry = f"[{timestamp}] {message}"
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(entry + "\n")
get_image_files function · python · L48-L57 (10 LOC)
dex-ocr.py
def get_image_files(directory: str) -> list[Path]:
    dir_path = Path(directory)
    if not dir_path.exists():
        print(f"ERROR: Directory not found: {directory}")
        sys.exit(1)
    files = [
        f for f in dir_path.iterdir()
        if f.is_file() and f.suffix.lower() in IMAGE_EXTENSIONS
    ]
    return sorted(files)
All rows scored by the Repobility analyzer (https://repobility.com)
build_header function · python · L68-L78 (11 LOC)
dex-ocr.py
def build_header(image_path: Path, img: Image.Image) -> str:
    width, height = img.size
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    return (
        f"SOURCE: {image_path.name}\n"
        f"FULL_PATH: {image_path.resolve()}\n"
        f"DIMENSIONS: {width}x{height}px\n"
        f"OCR_TIMESTAMP: {timestamp}\n"
        f"TYPE: screenshot\n"
        f"{'=' * 60}\n\n"
    )
run_ocr function · python · L81-L93 (13 LOC)
dex-ocr.py
def run_ocr(image_path: Path) -> tuple[str, str | None]:
    """Returns (text, error). error is None on success."""
    try:
        img = Image.open(image_path)
        # Convert to RGB if needed (handles RGBA, palette mode, etc.)
        if img.mode not in ("RGB", "L"):
            img = img.convert("RGB")
        text = pytesseract.image_to_string(img, config="--psm 3")
        header = build_header(image_path, img)
        full_text = header + text.strip()
        return full_text, None
    except Exception as e:
        return "", str(e)
trigger_ingest function · python · L100-L108 (9 LOC)
dex-ocr.py
def trigger_ingest():
    print("\n  Triggering canon ingestion...")
    result = subprocess.run(
        [sys.executable, INGEST_SCRIPT,
         "--path", CANON_PATH, "--build-canon"],
        capture_output=False
    )
    if result.returncode != 0:
        print("  WARNING: Ingest returned non-zero exit code.")
main function · python · L113-L205 (93 LOC)
dex-ocr.py
def main():
    parser = argparse.ArgumentParser(
        description="dex-ocr.py — Convert screenshots to .txt for Dex Jr. RAG corpus"
    )
    parser.add_argument("--dir",             required=True,  help="Input folder of images")
    parser.add_argument("--out-dir",         default=DEFAULT_OUT_DIR, help="Output folder for .txt files")
    parser.add_argument("--ingest",          action="store_true", help="Trigger canon ingestion after conversion")
    parser.add_argument("--preview",         action="store_true", help="Print OCR output without saving")
    parser.add_argument("--skip-processed",  action="store_true", help="Skip files already converted")
    args = parser.parse_args()

    out_dir = Path(args.out_dir)
    if not args.preview:
        out_dir.mkdir(parents=True, exist_ok=True)

    images = get_image_files(args.dir)
    total = len(images)

    if total == 0:
        print(f"No image files found in: {args.dir}")
        sys.exit(0)

    print(f"\n  dex-ocr.py — Screen
BackupFailedError class · python · L33-L35 (3 LOC)
dex_pipeline.py
class BackupFailedError(Exception):
    """A backup attempt ran and failed validation, or the most recent
    backup is structurally corrupt (sqlite unreadable, etc.)."""
build_chunk_metadata function · python · L62-L167 (106 LOC)
dex_pipeline.py
def build_chunk_metadata(
    source_file: str,
    source_path: str,
    source_type: str,
    ingest_run_id: str,
    chunk_index: int,
    chunk_total: int,
    ingested_at: Optional[str] = None,
) -> dict:
    """
    Build a validated metadata dict for a single chunk.

    Per STD-DDL-METADATA-001. All seven fields are mandatory.
    Validates per the standard's validation rules.
    Raises ValueError on validation failure with a specific message
    naming the field that failed.

    Args:
        source_file: Filename only, not the path. e.g. "DDLCouncilReview_AntiPractice.txt"
        source_path: Full absolute path at time of ingest.
        source_type: One of VALID_SOURCE_TYPES. Use "unknown" if uncertain.
        ingest_run_id: e.g. "sweep_2026-04-11_0300", "manual_2026-04-11_1842",
                       "backfill_2026-04-11", or "test_<purpose>_2026-04-11".
        chunk_index: 0-indexed position within source file.
        chunk_total: total number of chunks from this so
verify_ingest function · python · L170-L208 (39 LOC)
dex_pipeline.py
def verify_ingest(
    collection_name: str,
    source_file: str,
    expected_chunk_count: int,
) -> tuple[bool, int]:
    """
    Verify that the expected number of chunks for a given source_file
    landed in the target collection.

    Read-only operation. Queries by metadata filter, no writes.

    Args:
        collection_name: e.g. "dex_canon", "ddl_archive"
        source_file: filename to query for (matches metadata source_file)
        expected_chunk_count: how many chunks the ingest path attempted to write

    Returns:
        Tuple of (success: bool, actual_count: int).
        success is True iff actual_count == expected_chunk_count.
    """
    # Use the same client pattern as dex_weights.py
    # Local import to avoid hard dependency at module-load time
    from dex_weights import get_client

    client = get_client()
    try:
        collection = client.get_collection(collection_name)
    except Exception as e:
        # Collection doesn't exist or can't be reached — 
move_to_staging function · python · L211-L277 (67 LOC)
dex_pipeline.py
def move_to_staging(source_path: str) -> Path:
    """
    Atomically move a file from DDL_Ingest to DDL_Staging,
    preserving its relative path structure within the inbox.

    A file at DDL_Ingest/foo/bar/baz.txt becomes
    DDL_Staging/foo/bar/baz.txt — same relative path, different root.

    If the destination directory doesn't exist, it's created. If a
    file already exists at the destination (collision), the function
    raises FileExistsError rather than overwriting — collisions
    require operator review.

    Per ADR-INGEST-PIPELINE-001 §"Three folders, three states":
    nothing is ever deleted. This function only moves forward.

    Args:
        source_path: Absolute path to a file inside DDL_Ingest.

    Returns:
        Path object pointing to the new location in DDL_Staging.

    Raises:
        ValueError: if source_path is not inside DDL_INGEST_ROOT.
        FileNotFoundError: if source_path doesn't exist.
        FileExistsError: if destination already exists (n
If a scraper extracted this row, it came from Repobility (https://repobility.com)
ensure_backup_current function · python · L280-L403 (124 LOC)
dex_pipeline.py
def ensure_backup_current(
    expected_write_chunks: int = 0,
    force_check: bool = False,
    dry_run: bool = False,
) -> dict:
    """
    Ensure the corpus backup is current per STD-DDL-BACKUP-001 §"Trigger 3".

    Trigger 3 is the pre-write gate. This helper is the in-pipeline gate
    that ingest paths call before writing chunks. dex-backup.py is the
    single source of truth for trigger logic and the actual copy — this
    helper just shells out to it via subprocess and interprets the result.

    Args:
        expected_write_chunks: how many chunks the caller intends to write.
            If > 100, Trigger 5 fires regardless of other triggers.
        force_check: if True, run a backup regardless of trigger state.
            (Interpreted as "force-run a backup," consistent with the
            self-test "force_check=True → backup_ran=True" expectation.)
        dry_run: if True, --dry-run is added to the dex-backup.py
            invocation. Trigger logic still runs and th
get_embedding function · python · L14-L21 (8 LOC)
dex-query.py
def get_embedding(text: str):
    r = requests.post(
        OLLAMA_URL,
        json={"model": EMBED_MODEL, "prompt": text},
        timeout=60
    )
    r.raise_for_status()
    return r.json()["embedding"]
main function · python · L23-L122 (100 LOC)
dex-query.py
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("query", nargs="?", help="Search query")
    parser.add_argument("--top", type=int, default=DEFAULT_TOP_N)
    parser.add_argument("--raw", action="store_true", help="Search RAW archive instead of canon (unweighted)")
    parser.add_argument("--external", action="store_true", help="Include ext_canon and ext_archive in search")
    parser.add_argument("--stats", action="store_true", help="Show DB stats")
    parser.add_argument("--weight-stats", action="store_true", help="Show source weight table and exit")
    args = parser.parse_args()

    # Weight table — no DB needed
    if args.weight_stats:
        print_weight_stats()
        return

    # DB stats — open client here only
    if args.stats:
        client = chromadb.PersistentClient(path=CHROMA_DIR)
        canon   = client.get_collection("dex_canon").count()
        archive = client.get_collection("ddl_archive").count()
        print("\n================ 
get_embedding function · python · L71-L74 (4 LOC)
dex-search-api.py
def get_embedding(text):
    resp = req.post(OLLAMA_URL, json={"model": EMBED_MODEL, "prompt": text}, timeout=30)
    resp.raise_for_status()
    return resp.json()["embedding"]
get_rag_context function · python · L76-L88 (13 LOC)
dex-search-api.py
def get_rag_context(query, collection, top_n=3):
    """Pull relevant MindFrame context from the corpus."""
    try:
        embedding = get_embedding(query)
        results = collection.query(
            query_embeddings=[embedding],
            n_results=top_n,
            include=["documents"],
        )
        chunks = [doc for doc in results["documents"][0] if doc]
        return "\n\n---\n\n".join(chunks)
    except:
        return ""
ChatMessage class · python · L109-L111 (3 LOC)
dex-search-api.py
class ChatMessage(BaseModel):
    role: str
    content: str
root function · python · L117-L120 (4 LOC)
dex-search-api.py
def root():
    return {"service": "DDL API", "status": "online", "version": "0.5.0",
            "canon": canon.count() if canon else 0,
            "archive": archive.count() if archive else 0}
search function · python · L123-L133 (11 LOC)
dex-search-api.py
def search(q: str = Query(..., min_length=2), top: int = Query(5, ge=1, le=20), corpus: str = Query("canon")):
    collection = canon if corpus == "canon" else archive
    if not collection:
        return {"error": f"'{corpus}' not found", "results": []}
    embedding = get_embedding(q)
    results = collection.query(query_embeddings=[embedding], n_results=top, include=["documents", "metadatas", "distances"])
    hits = []
    for i in range(len(results["ids"][0])):
        meta = results["metadatas"][0][i]
        hits.append({"text": results["documents"][0][i][:500], "source": meta.get("source_file", "?"), "distance": round(results["distances"][0][i], 2)})
    return {"query": q, "corpus": corpus, "count": len(hits), "results": hits}
Repobility (the analyzer behind this table) · https://repobility.com
mindframe_chat function · python · L136-L173 (38 LOC)
dex-search-api.py
def mindframe_chat(request: ChatRequest):
    """MindFrame calibration chat. Sends conversation to Ollama with MindFrame system prompt + RAG context."""
    
    # Get the latest user message for RAG context
    latest_user_msg = ""
    for msg in reversed(request.messages):
        if msg.role == "user":
            latest_user_msg = msg.content
            break
    
    # Pull relevant MindFrame context from corpus
    rag_context = ""
    if canon and latest_user_msg:
        rag_context = get_rag_context("MindFrame calibration " + latest_user_msg, canon, top_n=3)
    
    # Build system prompt with RAG context
    system_prompt = MINDFRAME_SYSTEM
    if rag_context:
        system_prompt += f"\n\nRELEVANT DDL CONTEXT (from the archive):\n{rag_context}\n\nUse this context to inform your questions and observations, but don't quote it directly."
    
    # Build Ollama messages
    ollama_messages = [{"role": "system", "content": system_prompt}]
    for msg in request.messages:
     
run function · python · L12-L22 (11 LOC)
dex-setup.py
def run(cmd, label):
    print(f"\n{'='*60}")
    print(f"  {label}")
    print(f"{'='*60}")
    result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
    if result.returncode == 0:
        print(f"  ✓ {label} — OK")
    else:
        print(f"  ✗ {label} — FAILED")
        print(f"    {result.stderr.strip()}")
    return result.returncode == 0
main function · python · L24-L90 (67 LOC)
dex-setup.py
def main():
    print("\n" + "="*60)
    print("  DDL RAG PIPELINE — PHASE 1 SETUP")
    print("  Dex Jr. × Archive Integration")
    print("="*60)

    checks = []

    # 1. Check Ollama is running
    checks.append(run("ollama list", "Ollama is installed and running"))

    # 2. Pull embedding model
    checks.append(run("ollama pull nomic-embed-text", "Pull nomic-embed-text embedding model"))

    # 3. Install Python dependencies
    checks.append(run(
        f"{sys.executable} -m pip install chromadb requests --quiet",
        "Install ChromaDB and requests"
    ))

    # 4. Verify imports
    try:
        import chromadb
        print(f"\n  ✓ ChromaDB version: {chromadb.__version__}")
        checks.append(True)
    except ImportError:
        print("\n  ✗ ChromaDB import failed")
        checks.append(False)

    try:
        import requests
        print(f"  ✓ Requests available")
        checks.append(True)
    except ImportError:
        print(f"  ✗ Requests import failed")
 
scan_drop_folders function · python · L90-L109 (20 LOC)
dex-sweep.py
def scan_drop_folders():
    """Find all ingestible files across all drop folders."""
    found = []
    for folder in DROP_FOLDERS:
        if not os.path.exists(folder):
            # Rule 15 fix: surface missing drop folders instead of silent skip.
            print(f"  WARN: drop folder not found (skipping): {folder}")
            continue
        for filename in os.listdir(folder):
            filepath = os.path.join(folder, filename)
            if os.path.isfile(filepath):
                ext = os.path.splitext(filename)[1].lower()
                if ext in INGEST_EXTENSIONS:
                    found.append({
                        "source": filepath,
                        "filename": filename,
                        "folder": folder,
                        "size": os.path.getsize(filepath),
                    })
    return found
classify_scanned_files function · python · L112-L135 (24 LOC)
dex-sweep.py
def classify_scanned_files(files):
    """
    Split scanned files into (user_files, ingest_reports).

    Per STD-DDL-SWEEPREPORT-001 v1.0 classification predicate:
    a file is an ingest report IF AND ONLY IF all three conditions hold:
      1. parent folder name is '_sweep_reports'
      2. filename starts with 'ingest_report_'
      3. extension is '.md'
    """
    user_files = []
    ingest_reports = []
    for f in files:
        parent_name = os.path.basename(os.path.dirname(f["source"]))
        is_report = (
            parent_name == "_sweep_reports"
            and f["filename"].startswith("ingest_report_")
            and f["filename"].endswith(".md")
        )
        if is_report:
            ingest_reports.append(f)
        else:
            user_files.append(f)
    return user_files, ingest_reports
find_previous_report function · python · L138-L147 (10 LOC)
dex-sweep.py
def find_previous_report():
    """Find the most recent ingest_report_*.md in SWEEP_REPORTS_DIR."""
    if not os.path.exists(SWEEP_REPORTS_DIR):
        return None
    reports = sorted(
        [f for f in os.listdir(SWEEP_REPORTS_DIR)
         if f.startswith("ingest_report_") and f.endswith(".md")],
        reverse=True,
    )
    return os.path.join(SWEEP_REPORTS_DIR, reports[0]) if reports else None
write_sweep_report function · python · L150-L298 (149 LOC)
dex-sweep.py
def write_sweep_report(
    ingest_run_id,
    triggered_at,
    files_ingested,
    skipped_files,
    errors,
    ingestion_ok,
    backup_ran,
    backup_path,
    outcome,
    subprocess_output="",
):
    """
    Compose STD-DDL-SWEEPREPORT-001 v1.0 markdown report.
    Write to SWEEP_REPORTS_DIR. Return the written path, or None on failure.
    Failure is WARN-level, does NOT raise.
    """
    try:
        os.makedirs(SWEEP_REPORTS_DIR, exist_ok=True)
        now = datetime.datetime.now(datetime.timezone.utc)
        ts = now.strftime("%Y-%m-%d_%H%M%S")
        us = f"{now.microsecond:06d}"
        fname = f"ingest_report_{ts}.{us}_{ingest_run_id}.md"
        path = os.path.join(SWEEP_REPORTS_DIR, fname)

        # Pipeline state (read-only ChromaDB query)
        pipeline_state = {}
        try:
            from dex_weights import get_client
            client = get_client()
            for cname in ["dex_canon", "ddl_archive", "dex_code", "ext_creator"]:
                try:
  
copy_to_corpus function · python · L301-L342 (42 LOC)
dex-sweep.py
def copy_to_corpus(files, dry_run=False, temp_dir=None):
    """
    Copy files to CANON_DIR (archival) + optionally to temp_dir (for ingest).

    Step 22 Fix A: files are ingested from a small temp dir containing only
    the new files, not from CANON_DIR (which has 5800+ historical files).
    Files also go to CANON_DIR for permanent archival. Originals move to
    _processed/ per existing behavior.
    """
    os.makedirs(CANON_DIR, exist_ok=True)

    copied = []
    for f in files:
        dest = os.path.join(CANON_DIR, f["filename"])
        processed_dir = os.path.join(f["folder"], "_processed")
        os.makedirs(processed_dir, exist_ok=True)
        processed = os.path.join(processed_dir, f["filename"])

        # Handle filename conflicts in CANON_DIR
        if os.path.exists(dest):
            ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
            name, ext = os.path.splitext(f["filename"])
            f["filename"] = f"{name}_{ts}{ext}"
            dest = os.
Open data scored by Repobility · https://repobility.com
run_ingestion function · python · L344-L390 (47 LOC)
dex-sweep.py
def run_ingestion(ingest_path, dry_run=False):
    """
    Trigger ingestion via subprocess to dex-ingest.py.

    Step 22 Fix A: ingest_path is a temp dir containing only the new files,
    NOT CANON_DIR (which has 5800+ files and would timeout). This makes
    ingest fast and targeted.

    Returns: (ok: bool, stderr_on_failure: str | None, stdout: str)
    """
    cmd_args = ["python", INGEST_SCRIPT, "--path", str(ingest_path),
                "--collection", "dex_canon", "--fast", "--skip-backup-check"]

    if dry_run:
        print(f"  [DRY RUN] Would run: {' '.join(cmd_args)}")
        return (True, None, "")

    if not os.path.exists(INGEST_SCRIPT):
        msg = f"Ingest script not found: {INGEST_SCRIPT}"
        print(f"  ERROR: {msg}")
        return (False, msg, "")

    try:
        print(f"  Running ingestion against {ingest_path}...")
        ingest_env = {**os.environ, "PYTHONIOENCODING": "utf-8"}
        result = subprocess.run(
            cmd_args,
            captu
log_sweep function · python · L392-L430 (39 LOC)
dex-sweep.py
def log_sweep(files_found, files_copied, ingestion_ok,
              start_ts=None, end_ts=None,
              backup_ran=None, backup_path=None,
              error=None, recovery_hint=None,
              subprocess_stderr=None, dry_run=False,
              outcome=None, report_written=None,
              report_write_error=None):
    """
    Append a JSON line to dex-sweep-log.jsonl.

    All new fields default to None so old readers tolerate the schema
    evolution. Log write failures are non-blocking (Rule 15 compliance:
    non-silent WARN, but must not prevent sweep completion).
    """
    entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "start_ts": start_ts,
        "end_ts": end_ts,
        "files_found": len(files_found),
        "files_copied": len(files_copied),
        "filenames": [f["filename"] for f in files_copied],
        "ingestion_triggered": len(files_copied) > 0,
        "ingestion_success": ingestion_ok,
        "backup_ran": backup_r
sweep function · python · L435-L612 (178 LOC)
dex-sweep.py
def sweep(dry_run=False):
    """
    Run one sweep cycle per STD-DDL-SWEEPREPORT-001 v1.0.

    Classification logic:
    - If only ingest reports found (no user files): skip (no gate, no ingest, no report)
    - If nothing found: skip
    - Otherwise: gate → copy → ingest → report

    Guarantees a log_sweep call via try/finally.
    """
    start_ts = datetime.datetime.now().isoformat()
    triggered_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
    ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n  {'='*50}")
    print(f"  DEX JR AUTO-SWEEP v2.0 — {ts}")
    print(f"  {'='*50}")
    print(f"  Scanning {len(DROP_FOLDERS)} drop folder(s)...")

    # State captured across try/finally
    files: list = []
    copied: list = []
    ingestion_ok = False
    backup_ran = None
    backup_path = None
    error = None
    recovery_hint = None
    subprocess_stderr = None
    outcome = "no_files_found"
    report_path = None
    report_write_error = Non
watch function · python · L617-L633 (17 LOC)
dex-sweep.py
def watch(interval_minutes, dry_run=False):
    """Continuously watch for new files."""
    print(f"\n  DEX JR AUTO-SWEEP — Watch Mode")
    print(f"  Checking every {interval_minutes} minute(s)")
    print(f"  Drop folders: {len(DROP_FOLDERS)}")
    for f in DROP_FOLDERS:
        exists = "OK" if os.path.exists(f) else "NOT FOUND"
        print(f"    [{exists}] {f}")
    print(f"  Press Ctrl+C to stop\n")

    while True:
        try:
            sweep(dry_run=dry_run)
            time.sleep(interval_minutes * 60)
        except KeyboardInterrupt:
            print("\n  Watch stopped.")
            break
run_self_tests function · python · L638-L700 (63 LOC)
dex-sweep.py
def run_self_tests():
    """D8 self-tests per STD-DDL-SWEEPREPORT-001 classification predicate."""
    print("Running self-tests...")
    passed = 0

    # Classification predicate: 6 cases
    def make_file(source, filename):
        return {"source": source, "filename": filename, "folder": os.path.dirname(source), "size": 100}

    # Case 1: valid report (all 3 conditions true)
    f1 = make_file(r"C:\DDL_Ingest\_sweep_reports\ingest_report_2026-04-12.md", "ingest_report_2026-04-12.md")
    u, r = classify_scanned_files([f1])
    assert len(r) == 1 and len(u) == 0, f"Case 1 failed: {u}, {r}"
    print("  [OK] Case 1: valid report classified as report")
    passed += 1

    # Case 2: wrong parent folder
    f2 = make_file(r"C:\DDL_Ingest\ingest_report_2026-04-12.md", "ingest_report_2026-04-12.md")
    u, r = classify_scanned_files([f2])
    assert len(u) == 1 and len(r) == 0, f"Case 2 failed"
    print("  [OK] Case 2: wrong parent → user file")
    passed += 1

    # Case 3: wrong pr
main function · python · L703-L719 (17 LOC)
dex-sweep.py
def main():
    parser = argparse.ArgumentParser(description="Dex Jr Auto-Sweep v2.0")
    parser.add_argument("--watch", action="store_true", help="Watch continuously")
    parser.add_argument("--interval", type=int, default=DEFAULT_INTERVAL, help="Minutes between checks")
    parser.add_argument("--dry-run", action="store_true", help="Preview without acting")
    parser.add_argument("--self-test", action="store_true", help="Run classification + report self-tests")

    args = parser.parse_args()

    if args.self_test:
        run_self_tests()
        return

    if args.watch:
        watch(args.interval, dry_run=args.dry_run)
    else:
        sweep(dry_run=args.dry_run)
collection_exists function · python · L53-L58 (6 LOC)
dex_weights.py
def collection_exists(client, name: str) -> bool:
    try:
        client.get_collection(name)
        return True
    except Exception:
        return False
embed function · python · L62-L69 (8 LOC)
dex_weights.py
def embed(text: str) -> list[float]:
    r = requests.post(
        OLLAMA_EMBED_URL,
        json={"model": EMBED_MODEL, "prompt": text},
        timeout=60
    )
    r.raise_for_status()
    return r.json()["embedding"]
All rows scored by the Repobility analyzer (https://repobility.com)
calculate_weight function · python · L73-L95 (23 LOC)
dex_weights.py
def calculate_weight(collection_name: str, metadata: dict) -> float:
    base      = COLLECTIONS.get(collection_name, {}).get("base_weight", 0.65)
    file_type = metadata.get("file_type", "").lower()
    type_mult = FILE_TYPE_WEIGHTS.get(file_type, 1.00)

    filename = metadata.get("filename", "").lower()
    source   = metadata.get("source_file", "").lower()

    # Pattern-based type detection if file_type not set cleanly
    if file_type == "" or file_type not in FILE_TYPE_WEIGHTS:
        if any(x in filename for x in ["cr-llms", "cr-site", "cr-ops", "cr-infra", "cr-nuet"]):
            type_mult = FILE_TYPE_WEIGHTS["council_review"]
        elif any(x in filename for x in ["std-", "pro-c2d", "pro-"]):
            type_mult = FILE_TYPE_WEIGHTS["protocol"]
        elif any(x in source for x in ["98_threads", "thread"]):
            type_mult = FILE_TYPE_WEIGHTS["thread_export"]
        elif any(x in source for x in ["note", "ios_note"]):
            type_mult = FILE_TYPE_WEIGHTS["i
score_result function · python · L98-L105 (8 LOC)
dex_weights.py
def score_result(distance: float, weight: float) -> float:
    """
    Convert L2 distance to weighted similarity score.
    Uses 1/(1+distance) — works at any distance scale.
    Higher score = better match.
    """
    similarity = 1.0 / (1.0 + distance)
    return round(similarity * weight, 6)
weighted_query function · python · L109-L168 (60 LOC)
dex_weights.py
def weighted_query(
    query_text: str,
    n_results: int = 5,
    include_external: bool = False,
    collections: Optional[list] = None,
    where_filter: Optional[dict] = None
) -> list[dict]:
    client          = get_client()
    query_embedding = embed(query_text)

    if collections:
        target_collections = collections
    else:
        target_collections = ["dex_canon", "ddl_archive", "ext_creator", "ext_reference", "dex_code"]
        if include_external:
            target_collections += ["ext_canon", "ext_archive"]

    fetch_per_collection = max(n_results * 3, 15)
    all_results = []

    for coll_name in target_collections:
        if not collection_exists(client, coll_name):
            continue
        try:
            collection   = client.get_collection(coll_name)
            query_kwargs = {
                "query_embeddings": [query_embedding],
                "n_results":        fetch_per_collection,
                "include":          ["documents", "metadat
‹ prevpage 3 / 4next ›