← back to dropdownlogistics__dex-rag

Function bodies 161 total

All specs Real LLM only Function bodies
read_sample function · python · L56-L66 (11 LOC)
audit_ddl_ingest_xref.py
def read_sample(full_path, max_chars=1800):
    """Read a representative chunk: skip first 200 chars (often boilerplate), take next chunk."""
    try:
        with open(full_path, "r", encoding="utf-8", errors="replace") as f:
            txt = f.read()
        # Take a middle-ish slice — avoids title/header-only matching
        if len(txt) > 3000:
            return txt[800:800 + max_chars]
        return txt[:max_chars]
    except Exception as e:
        return None
metadata_filename_hits function · python · L69-L92 (24 LOC)
audit_ddl_ingest_xref.py
def metadata_filename_hits(client, coll_name, stem):
    """Check if any chunk metadata has a filename containing this stem.
    Tries filename and source_file fields. Chroma .get() with where= supports equality;
    we use $contains via a fetch-and-scan fallback on a small sample if needed."""
    try:
        coll = client.get_collection(coll_name)
    except Exception:
        return None  # collection missing
    hits = 0
    # Try exact filename match first
    for field in ("filename", "source_file", "source"):
        try:
            r = coll.get(where={field: stem}, limit=5, include=["metadatas"])
            if r and r.get("ids"):
                hits += len(r["ids"])
        except Exception:
            pass
    # If no exact hits, try a peek-scan for substring (bounded)
    if hits == 0:
        try:
            peek = coll.get(limit=0)  # just to confirm access
        except Exception:
            return 0
    return hits
semantic_hits function · python · L95-L116 (22 LOC)
audit_ddl_ingest_xref.py
def semantic_hits(client, coll_name, text_sample, top_k=3, distance_threshold=0.6):
    """Run a vector query. Return (count_below_threshold, best_distance)."""
    try:
        coll = client.get_collection(coll_name)
    except Exception:
        return None, None
    try:
        emb = embed(text_sample)
        r = coll.query(
            query_embeddings=[emb],
            n_results=top_k,
            include=["distances", "metadatas"],
        )
        dists = r["distances"][0] if r.get("distances") else []
        metas = r["metadatas"][0] if r.get("metadatas") else []
        if not dists:
            return 0, None
        best = min(dists)
        count_near = sum(1 for d in dists if d < distance_threshold)
        return count_near, (best, metas[0] if metas else {})
    except Exception as e:
        return None, f"ERR: {e}"
main function · python · L119-L189 (71 LOC)
audit_ddl_ingest_xref.py
def main():
    print("=" * 80)
    print("DDL_INGEST -> DEX-RAG CORPUS CROSS-REFERENCE (READ-ONLY)")
    print("=" * 80)
    client = get_client()
    # list collections present
    try:
        present = [c.name for c in client.list_collections()]
    except Exception as e:
        print(f"FATAL: can't list collections: {e}")
        return
    print(f"\nCollections present: {present}")
    print(f"Expected from COLLECTIONS dict: {COLLS}")
    missing = [c for c in COLLS if c not in present]
    if missing:
        print(f"MISSING: {missing}")
    print()

    # Content-based clusters
    for cluster, rel_paths in CLUSTERS.items():
        print("-" * 80)
        print(f"CLUSTER: {cluster}")
        for rel in rel_paths:
            full = os.path.join(INGEST_ROOT, rel)
            stem = os.path.basename(rel)
            print(f"\n  FILE: {stem}")
            if not os.path.exists(full):
                print(f"    [missing on disk: {full}]")
                continue
            sam
get_collection function · python · L55-L61 (7 LOC)
dex-acquire.py
def get_collection(name: str):
    client = chromadb.PersistentClient(path=CHROMA_PATH)
    try:
        return client.get_or_create_collection(name=name)
    except Exception as e:
        print(f"  [ERROR] ChromaDB collection '{name}': {e}")
        sys.exit(1)
fetch_url function · python · L65-L81 (17 LOC)
dex-acquire.py
def fetch_url(url: str) -> tuple[str, int]:
    """Fetch URL, strip HTML to plain text. Returns (text, char_count)."""
    headers = {"User-Agent": "Mozilla/5.0 (DexJr RAG Acquisition; DDL Research)"}
    try:
        response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, "html.parser")
        # Remove nav, footer, scripts, styles
        for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
            tag.decompose()
        text = soup.get_text(separator="\n", strip=True)
        # Collapse excessive whitespace
        text = re.sub(r'\n{3,}', '\n\n', text)
        text = re.sub(r' {2,}', ' ', text)
        return text.strip(), len(text)
    except requests.RequestException as e:
        return "", 0
evaluate_content function · python · L109-L136 (28 LOC)
dex-acquire.py
def evaluate_content(url: str, text: str, topic: str) -> dict:
    """Ask Dex Jr. to evaluate content quality and relevance."""
    if not text:
        return {
            "overall": 0, "decision": "SKIP",
            "reason": "Empty content — page may require JavaScript or blocked fetch.",
            "relevance": 0, "quality": 0, "redundancy": "unknown", "usability": 0
        }

    preview = text[:3000]
    prompt = EVAL_PROMPT_TEMPLATE.format(
        topic=topic, content_preview=preview, url=url
    )

    try:
        response = ollama.chat(
            model=EVAL_MODEL,
            messages=[{"role": "user", "content": prompt}],
            options={"temperature": 0.1}
        )
        raw = response["message"]["content"]
        return parse_evaluation(raw)
    except Exception as e:
        return {
            "overall": 0, "decision": "FLAG",
            "reason": f"Evaluation failed: {e}",
            "relevance": 0, "quality": 0, "redundancy": "unknown", "usability": 
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
parse_evaluation function · python · L138-L172 (35 LOC)
dex-acquire.py
def parse_evaluation(raw: str) -> dict:
    """Parse Dex Jr.'s structured evaluation response."""
    result = {
        "relevance": 0, "quality": 0, "redundancy": "unknown",
        "usability": 0, "overall": 0, "decision": "FLAG", "reason": ""
    }
    try:
        for line in raw.strip().splitlines():
            if line.startswith("RELEVANCE:"):
                result["relevance"] = float(line.split(":")[1].strip())
            elif line.startswith("QUALITY:"):
                result["quality"] = float(line.split(":")[1].strip())
            elif line.startswith("REDUNDANCY:"):
                result["redundancy"] = line.split(":")[1].strip().lower()
            elif line.startswith("USABILITY:"):
                result["usability"] = float(line.split(":")[1].strip())
            elif line.startswith("OVERALL:"):
                result["overall"] = float(line.split(":")[1].strip())
            elif line.startswith("DECISION:"):
                result["decision"] = line.split(":")
build_header function · python · L176-L187 (12 LOC)
dex-acquire.py
def build_header(url: str, topic: str, score: float, fetch_date: str) -> str:
    """Standard source attribution header for every ingested document."""
    domain = urlparse(url).netloc
    return (
        f"SOURCE: {url}\n"
        f"DOMAIN: {domain}\n"
        f"TOPIC: {topic}\n"
        f"QUALITY_SCORE: {score}/10\n"
        f"FETCH_DATE: {fetch_date}\n"
        f"INGESTED_BY: dex-acquire.py v1.0\n"
        f"{'='*60}\n\n"
    )
chunk_text function · python · L189-L197 (9 LOC)
dex-acquire.py
def chunk_text(text: str) -> list[str]:
    """Chunk text by character count with overlap."""
    chunks = []
    start = 0
    while start < len(text):
        end = start + CHUNK_SIZE
        chunks.append(text[start:end])
        start += CHUNK_SIZE - CHUNK_OVERLAP
    return chunks
ingest_to_collection function · python · L199-L252 (54 LOC)
dex-acquire.py
def ingest_to_collection(url: str, text: str, topic: str,
                          score: float, collection_name: str) -> int:
    """Ingest content into specified ChromaDB collection. Returns chunk count."""
    fetch_date = datetime.now().strftime("%Y-%m-%d")
    header = build_header(url, topic, score, fetch_date)
    full_text = header + text

    chunks = chunk_text(full_text)
    if not chunks:
        return 0

    collection = get_collection(collection_name)
    domain = urlparse(url).netloc
    url_hash = hashlib.sha256(url.encode()).hexdigest()[:12]

    ids, embeddings, metadatas, documents = [], [], [], []

    for i, chunk in enumerate(chunks):
        chunk_id = f"acq_{url_hash}_{i:04d}"
        try:
            emb_response = ollama.embeddings(model=EMBED_MODEL, prompt=chunk)
            embedding = emb_response["embedding"]
        except Exception as e:
            print(f"    [WARN] Embedding failed for chunk {i}: {e}")
            continue

        ids.append(chunk_
save_to_folder function · python · L256-L273 (18 LOC)
dex-acquire.py
def save_to_folder(url: str, text: str, topic: str, score: float,
                   subfolder: str, base_dir: str) -> str:
    """Save acquired content as .txt file. Returns path."""
    domain = urlparse(url).netloc.replace(".", "_")
    path_slug = urlparse(url).path.strip("/").replace("/", "_")[:40]
    filename = f"{domain}_{path_slug}.txt" if path_slug else f"{domain}.txt"

    folder = Path(base_dir) / subfolder
    folder.mkdir(parents=True, exist_ok=True)
    filepath = folder / filename

    fetch_date = datetime.now().strftime("%Y-%m-%d")
    header = build_header(url, topic, score, fetch_date)

    with open(filepath, "w", encoding="utf-8") as f:
        f.write(header + text)

    return str(filepath)
log_result function · python · L277-L279 (3 LOC)
dex-acquire.py
def log_result(entry: dict):
    with open(LOG_FILE, "a", encoding="utf-8") as f:
        f.write(json.dumps(entry) + "\n")
print_report function · python · L283-L331 (49 LOC)
dex-acquire.py
def print_report(results: list[dict], topic: str, auto_ingest: bool, collection: str):
    ingested = [r for r in results if r["action"] == "INGESTED"]
    flagged  = [r for r in results if r["action"] == "FLAGGED"]
    skipped  = [r for r in results if r["action"] == "SKIPPED"]
    failed   = [r for r in results if r["action"] == "FAILED"]

    total_chunks = sum(r.get("chunks_added", 0) for r in ingested)

    print("\n" + "="*60)
    print("  DEX JR. ACQUISITION REPORT")
    print(f"  Topic: {topic}")
    print(f"  Mode: {'AUTO-INGEST' if auto_ingest else 'REVIEW ONLY'}")
    print(f"  Collection: {collection}")
    print(f"  Run: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
    print("="*60)
    print(f"\n  URLS PROCESSED: {len(results)}")
    print(f"  INGESTED:  {len(ingested)} ({total_chunks} chunks added)")
    print(f"  FLAGGED:   {len(flagged)}  (needs review)")
    print(f"  SKIPPED:   {len(skipped)}")
    print(f"  FAILED:    {len(failed)}")

    if ingested:
        print
load_urls function · python · L335-L351 (17 LOC)
dex-acquire.py
def load_urls(source: str) -> list[str]:
    """Load URLs from file (one per line) or JSON plan."""
    path = Path(source)
    if not path.exists():
        print(f"[ERROR] File not found: {source}")
        sys.exit(1)

    if source.endswith(".json"):
        with open(source) as f:
            plan = json.load(f)
        # Support {"urls": [...]} or [{"url": ..., "topic": ...}]
        if isinstance(plan, list):
            return [item["url"] if isinstance(item, dict) else item for item in plan]
        return plan.get("urls", [])

    with open(source) as f:
        return [line.strip() for line in f if line.strip() and not line.startswith("#")]
Repobility (the analyzer behind this table) · https://repobility.com
main function · python · L355-L484 (130 LOC)
dex-acquire.py
def main():
    parser = argparse.ArgumentParser(
        description="Batch URL acquisition with Dex Jr. quality evaluation"
    )
    parser.add_argument("--topic",       required=False, help="Topic for relevance evaluation")
    parser.add_argument("--urls",        help="File of URLs (one per line)")
    parser.add_argument("--from-plan",   help="JSON acquisition plan")
    parser.add_argument("--auto-ingest", action="store_true",
                        help="Automatically ingest content scoring 7+/10")
    parser.add_argument("--review-only", action="store_true",
                        help="Evaluate only, do not ingest (default)")
    parser.add_argument("--collection",  default="ext_archive",
                        help="ChromaDB collection (default: ext_archive)")
    parser.add_argument("--save-dir",    default="acquisitions",
                        help="Base directory to save flagged/reviewed files")
    args = parser.parse_args()

    # Validate
    if not args.urls and 
RestoreTestFailedError class · python · L50-L61 (12 LOC)
dex-backup.py
class RestoreTestFailedError(Exception):
    """
    Raised by restore_test() when a backup fails post-creation verification.

    Trigger 6 of STD-DDL-BACKUP-001 (pending formalization). The backup
    directory that failed the restore test is NOT automatically renamed
    or deleted — the operator inspects it manually. The exception message
    contains the full diff (collection counts, missing collections, etc.).
    """
    def __init__(self, message: str, result: dict | None = None):
        super().__init__(message)
        self.result = result
__init__ method · python · L59-L61 (3 LOC)
dex-backup.py
    def __init__(self, message: str, result: dict | None = None):
        super().__init__(message)
        self.result = result
cleanup_stale_scratch function · python · L72-L105 (34 LOC)
dex-backup.py
def cleanup_stale_scratch(max_age_hours: float = 1.0) -> int:
    """
    Reclaim orphan restore_test_* directories in dex-rag-scratch/ that
    are older than max_age_hours.

    Exists because restore_test()'s in-process cleanup can fail on
    Windows when ChromaDB's HNSW data_level0.bin is still memory-mapped
    after `del client` + `gc.collect()`. The mmap is released when the
    Python process exits, so the *next* dex-backup.py run can safely
    remove what the previous run left behind.

    Tolerates file-lock errors by logging a WARN and continuing.
    Returns the count of directories actually removed.
    """
    scratch_root = Path(__file__).parent.parent / "dex-rag-scratch"
    if not scratch_root.exists():
        return 0

    removed = 0
    cutoff = datetime.now(timezone.utc).timestamp() - (max_age_hours * 3600)
    for child in scratch_root.iterdir():
        if not child.is_dir():
            continue
        if not child.name.startswith("restore_test_"):
         
find_existing_backups function · python · L108-L117 (10 LOC)
dex-backup.py
def find_existing_backups() -> list[Path]:
    """Return sorted list of existing backup directories (newest first)."""
    if not BACKUP_ROOT.exists():
        return []
    backups = [
        p for p in BACKUP_ROOT.iterdir()
        if p.is_dir() and p.name.startswith("chromadb_") and not p.name.endswith("_FAILED")
    ]
    backups.sort(key=lambda p: p.name, reverse=True)
    return backups
get_live_chunk_count function · python · L120-L128 (9 LOC)
dex-backup.py
def get_live_chunk_count() -> int:
    """Get total chunk count across all live collections."""
    sys.path.insert(0, str(Path(__file__).parent))
    from dex_weights import get_client
    client = get_client()
    total = 0
    for col in client.list_collections():
        total += client.get_collection(col.name).count()
    return total
read_manifest function · python · L131-L139 (9 LOC)
dex-backup.py
def read_manifest(backup_dir: Path) -> dict | None:
    """Read a backup's _manifest.json. Returns None if missing or invalid."""
    manifest_path = backup_dir / "_manifest.json"
    if not manifest_path.exists():
        return None
    try:
        return json.loads(manifest_path.read_text(encoding="utf-8"))
    except (json.JSONDecodeError, OSError):
        return None
check_triggers function · python · L142-L182 (41 LOC)
dex-backup.py
def check_triggers(expected_write_chunks: int = 0) -> tuple[bool, list[str]]:
    """
    Check all backup triggers per STD-DDL-BACKUP-001.
    Returns (should_backup, list_of_triggers_that_fired).
    """
    fired = []
    backups = find_existing_backups()

    if not backups:
        # No backups exist at all — Trigger 1 effectively fires
        fired.append("no_existing_backups")
        return (True, fired)

    most_recent = backups[0]
    manifest = read_manifest(most_recent)

    if manifest is None:
        fired.append("most_recent_manifest_invalid")
        return (True, fired)

    # Trigger 1: time-based
    last_backup_at = datetime.fromisoformat(manifest["created_at"].replace("Z", "+00:00"))
    age = datetime.now(timezone.utc) - last_backup_at
    if age > timedelta(days=TRIGGER_DAYS):
        fired.append(f"time_based_age_{age.days}d")

    # Trigger 2: volume-based
    try:
        live_count = get_live_chunk_count()
        backup_count = manifest.get("total_chunk_c
Repobility — same analyzer, your code, free for public repos · /scan/
build_check_status function · python · L185-L266 (82 LOC)
dex-backup.py
def build_check_status(expected_write_chunks: int = 0) -> dict:
    """
    Lightweight status of the most recent backup, suitable for the
    --check-only --json path consumed by ensure_backup_current().

    Does NOT call validate_backup() — that function compares the backup
    against current live state and would fail on any drift since the
    backup was taken. This function checks existence + manifest + sqlite
    readability only, plus runs check_triggers() to report what would
    fire if a write happened now.
    """
    result = {
        "exists": False,
        "most_recent": None,
        "most_recent_path": None,
        "manifest_valid": False,
        "sqlite_present": False,
        "sqlite_readable": False,
        "age_hours": None,
        "created_at": None,
        "total_chunk_count": None,
        "triggers_to_fire": [],
        "should_backup": False,
        "live_chunk_count": None,
        "expected_write_chunks": expected_write_chunks,
    }

    backups = 
sha256_file function · python · L269-L274 (6 LOC)
dex-backup.py
def sha256_file(path: Path) -> str:
    h = hashlib.sha256()
    with open(path, "rb") as f:
        for chunk in iter(lambda: f.read(65536), b""):
            h.update(chunk)
    return h.hexdigest()
query_collection_state function · python · L277-L302 (26 LOC)
dex-backup.py
def query_collection_state(sqlite_path: Path) -> dict[str, int]:
    """
    Query a ChromaDB SQLite file for collection names and chunk counts.
    Read-only. Returns dict of {collection_name: chunk_count}.
    """
    db_uri = f"file:/{str(sqlite_path).replace(chr(92), '/')}?mode=ro"
    con = sqlite3.connect(db_uri, uri=True)
    cur = con.cursor()
    cur.execute("SELECT id, name FROM collections")
    collections = cur.fetchall()
    result = {}
    for col_id, col_name in collections:
        # Count via embeddings table joined through segments
        # Fall back to a simpler count if join is unreliable
        try:
            cur.execute("""
                SELECT COUNT(*) FROM embeddings e
                JOIN segments s ON e.segment_id = s.id
                WHERE s.collection = ?
            """, (col_id,))
            result[col_name] = cur.fetchone()[0]
        except sqlite3.OperationalError:
            # Schema variant — just total embeddings, not per collection
      
perform_backup function · python · L305-L366 (62 LOC)
dex-backup.py
def perform_backup(dry_run: bool = False) -> tuple[bool, Path | None, dict]:
    """
    Run the actual backup. Returns (success, backup_path, manifest).
    """
    started_at = datetime.now(timezone.utc)
    timestamp = utc_now_compact()
    backup_dir = BACKUP_ROOT / f"chromadb_{timestamp}"

    if dry_run:
        print(f"[DRY RUN] Would create backup at: {backup_dir}")
        return (True, None, {})

    print(f"Creating backup at: {backup_dir}")
    BACKUP_ROOT.mkdir(parents=True, exist_ok=True)
    backup_dir.mkdir(parents=True, exist_ok=False)

    # Step 1: SQLite backup API for chroma.sqlite3
    src_sqlite = LIVE_CHROMADB / "chroma.sqlite3"
    dst_sqlite = backup_dir / "chroma.sqlite3"
    print(f"  SQLite backup: {src_sqlite.name}")
    src_con = sqlite3.connect(str(src_sqlite))
    dst_con = sqlite3.connect(str(dst_sqlite))
    src_con.backup(dst_con)
    src_con.close()
    dst_con.close()

    # Step 2: shutil.copytree for each UUID segment directory
    for item in LI
validate_backup function · python · L369-L429 (61 LOC)
dex-backup.py
def validate_backup(backup_dir: Path, manifest: dict) -> tuple[bool, list[str]]:
    """Validate a backup against STD-DDL-BACKUP-001 §"Validation rules"."""
    failures = []

    sqlite_path = backup_dir / "chroma.sqlite3"
    if not sqlite_path.exists():
        failures.append("chroma.sqlite3 missing in backup")
        return (False, failures)

    src_size = (LIVE_CHROMADB / "chroma.sqlite3").stat().st_size
    dst_size = sqlite_path.stat().st_size
    if abs(dst_size - src_size) / src_size > 0.05:
        failures.append(f"size delta >5%: src={src_size}, dst={dst_size}")

    try:
        db_uri = f"file:/{str(sqlite_path).replace(chr(92), '/')}?mode=ro"
        con = sqlite3.connect(db_uri, uri=True)
        con.close()
    except Exception as e:
        failures.append(f"backup sqlite won't open read-only: {e}")

    src_state = query_collection_state(LIVE_CHROMADB / "chroma.sqlite3")
    dst_state = manifest["collections"]
    if set(src_state.keys()) != set(dst_state.keys()):
restore_test function · python · L432-L614 (183 LOC)
dex-backup.py
def restore_test(backup_path: "Path | None" = None) -> dict:
    """
    Trigger 6 — post-backup restore verification.

    Copies a backup to a scratch location outside the repo and outside
    OneDrive, opens it as a fresh ChromaDB PersistentClient, enumerates
    all collections, counts each one, and compares the result to the
    manifest's recorded counts. Any mismatch raises
    RestoreTestFailedError with full diff detail.

    The original backup directory is never opened or modified — only
    the scratch copy is touched. Scratch is always cleaned up via a
    try/finally, even on failure. On Windows, a GC hint + one retry
    with a small delay handles the SQLite file-lock case.

    Args:
        backup_path: path to a backup directory. If None, uses the
            most recent backup from find_existing_backups().

    Returns:
        dict with test result:
            backup_tested: backup dir name
            scratch_path: scratch location (deleted by the time this return
rotate_backups function · python · L617-L659 (43 LOC)
dex-backup.py
def rotate_backups() -> tuple[bool, list[str]]:
    """Apply retention policy. Returns (success, list_of_pruned_paths)."""
    backups = find_existing_backups()
    if len(backups) <= RETAIN_DAILY:
        return (True, [])

    keep = set()

    # Keep last 7 daily
    for b in backups[:RETAIN_DAILY]:
        keep.add(b.name)

    # Keep last 4 weekly (one per week from older backups)
    weeks_kept = set()
    for b in backups[RETAIN_DAILY:]:
        manifest = read_manifest(b)
        if not manifest:
            continue
        created = datetime.fromisoformat(manifest["created_at"].replace("Z", "+00:00"))
        week_key = created.strftime("%Y-W%U")
        if week_key not in weeks_kept and len(weeks_kept) < RETAIN_WEEKLY:
            weeks_kept.add(week_key)
            keep.add(b.name)

    # Keep last 3 monthly
    months_kept = set()
    for b in backups[RETAIN_DAILY:]:
        manifest = read_manifest(b)
        if not manifest:
            continue
        created = dateti
append_log function · python · L662-L665 (4 LOC)
dex-backup.py
def append_log(entry: dict) -> None:
    BACKUP_ROOT.mkdir(parents=True, exist_ok=True)
    with open(BACKUP_LOG, "a", encoding="utf-8") as f:
        f.write(json.dumps(entry) + "\n")
Powered by Repobility — scan your code at https://repobility.com
main function · python · L668-L854 (187 LOC)
dex-backup.py
def main():
    parser = argparse.ArgumentParser(description="ChromaDB backup per STD-DDL-BACKUP-001")
    parser.add_argument("--force", action="store_true", help="Backup regardless of triggers")
    parser.add_argument("--dry-run", action="store_true", help="Check triggers, report, no copy")
    parser.add_argument("--rotate-only", action="store_true", help="Skip backup, just rotate")
    parser.add_argument("--check-only", action="store_true", help="Validate most recent backup")
    parser.add_argument("--json", action="store_true", help="Output structured JSON (only with --check-only)")
    parser.add_argument("--restore-test", action="store_true", help="Run Trigger 6 restore test on most recent backup")
    parser.add_argument("--skip-restore-test", action="store_true", help="Skip Trigger 6 restore test after backup creation")
    parser.add_argument("--expected-chunks", type=int, default=0, help="For pre-batch trigger")
    args = parser.parse_args()

    # Suppress banner in --c
get_embedding function · python · L74-L85 (12 LOC)
dex-bridge.py
def get_embedding(text):
    try:
        r = requests.post(
            OLLAMA_EMBED_URL,
            json={"model": EMBED_MODEL, "prompt": text},
            timeout=60,
        )
        r.raise_for_status()
        return r.json().get("embedding")
    except Exception as e:
        print(f"  [ERROR] Embedding failed: {e}")
        return None
retrieve function · python · L90-L140 (51 LOC)
dex-bridge.py
def retrieve(query, top_k=TOP_K, use_raw=False, include_external=False):
    """
    Returns (chunks, provenance_string).
    chunks: list of dicts with text, source, distance, label, weighted_score
    provenance: e.g. "[Sources: 3xCanon | 1xArchive | 1xExtCanon]"
    """
    if use_raw:
        # Legacy unweighted path - single collection
        embedding = get_embedding(query)
        if not embedding:
            return [], "[Sources: none]"
        client = get_client()
        collection = client.get_collection(RAW_COLLECTION)
        results = collection.query(
            query_embeddings=[embedding],
            n_results=top_k,
            include=["documents", "metadatas", "distances"],
        )
        chunks = []
        if results and results["documents"]:
            for i, doc in enumerate(results["documents"][0]):
                meta = results["metadatas"][0][i] if results["metadatas"] else {}
                dist = results["distances"][0][i] if results["distances"]
build_context function · python · L145-L158 (14 LOC)
dex-bridge.py
def build_context(chunks, provenance, max_chars=MAX_CONTEXT_CHARS):
    context_parts = [provenance, ""]
    total_chars = len(provenance)

    for i, chunk in enumerate(chunks):
        label = chunk.get("label", "")
        score = f" score={chunk['weighted_score']:.4f}" if chunk.get("weighted_score") else ""
        entry = f"[Source {i+1}: {chunk['source']} | {label}{score}]\n{chunk['text']}\n"
        if total_chars + len(entry) > max_chars:
            break
        context_parts.append(entry)
        total_chars += len(entry)

    return "\n".join(context_parts)
generate function · python · L163-L191 (29 LOC)
dex-bridge.py
def generate(query, context, model=DEFAULT_MODEL, chat_url=OLLAMA_CHAT_URL):
    prompt = f"""The following context was retrieved from the DDL knowledge base to help answer the question. Use this context to inform your answer. If the context does not contain relevant information, say so.

RETRIEVED CONTEXT:
{context}

QUESTION:
{query}

Answer based on the retrieved context and your governance training. Cite sources by number when referencing specific retrieved documents."""

    try:
        r = requests.post(
            chat_url,
            json={
                "model": model,
                "prompt": prompt,
                "stream": False,
                "options": {
                    "temperature": 0.3,
                    "num_ctx": 8192,
                },
            },
            timeout=120,
        )
        r.raise_for_status()
        return r.json().get("response", "[No response]")
    except Exception as e:
        return f"[ERROR] Generation failed: {e}"
auto_ingest function · python · L196-L220 (25 LOC)
dex-bridge.py
def auto_ingest(query, response, provenance, sources):
    """Write query+response transcript to bridge-ingest folder and trigger fast canon ingest."""
    try:
        os.makedirs(BRIDGE_INGEST_DIR, exist_ok=True)
        ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"bridge_{ts}.txt"
        filepath = os.path.join(BRIDGE_INGEST_DIR, filename)

        with open(filepath, "w", encoding="utf-8") as f:
            f.write(f"DEX JR RAG BRIDGE - Query Transcript\n")
            f.write(f"{'='*60}\n")
            f.write(f"TIMESTAMP: {datetime.datetime.now().isoformat()}\n")
            f.write(f"PROVENANCE: {provenance}\n")
            f.write(f"SOURCES: {', '.join(sources)}\n")
            f.write(f"{'='*60}\n\n")
            f.write(f"QUERY:\n{query}\n\n")
            f.write(f"RESPONSE:\n{response}\n")

        if os.path.exists(INGEST_SCRIPT):
            subprocess.run(
                ["python", INGEST_SCRIPT, "--path", BRIDGE_INGEST_DIR, "--build-canon
log_interaction function · python · L225-L245 (21 LOC)
dex-bridge.py
def log_interaction(query, chunks, provenance, response, model, use_raw, include_external):
    entry = {
        "timestamp":        datetime.datetime.now().isoformat(),
        "query":            query,
        "model":            model,
        "collection":       RAW_COLLECTION if use_raw else CANON_COLLECTION,
        "include_external": include_external,
        "provenance":       provenance,
        "chunks_retrieved": len(chunks),
        "sources":          [c["source"] for c in chunks],
        "labels":           [c.get("label", "") for c in chunks],
        "weighted_scores":  [c.get("weighted_score") for c in chunks],
        "distances":        [c["distance"] for c in chunks],
        "response_length":  len(response),
        "response_preview": response[:200],
    }
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
    except Exception as e:
        print(f"  [WARN] Logging failed: {e}")
display_results function · python · L250-L280 (31 LOC)
dex-bridge.py
def display_results(query, chunks, provenance, response, verbose=False):
    print()
    print("=" * 60)
    print(f" QUERY: {query}")
    print(f" {provenance}")
    print("=" * 60)
    print()

    if verbose:
        print("-" * 60)
        print(" RETRIEVED CONTEXT:")
        print("-" * 60)
        for i, chunk in enumerate(chunks):
            dist  = f"{chunk['distance']:.4f}" if chunk['distance'] is not None else "?"
            score = f"  weighted={chunk['weighted_score']:.4f}" if chunk.get("weighted_score") else ""
            label = chunk.get("label", "")
            print(f"\n  [{i+1}] [{label}] distance={dist}{score}")
            print(f"  source: {chunk['source']}")
            print(f"  {chunk['text'][:200]}...")
        print()
        print("-" * 60)

    print(" ANSWER:")
    print("-" * 60)
    print()
    print(response)
    print()
    print("-" * 60)
    print(f" Sources: {', '.join(c['source'] for c in chunks)}")
    print(f" Log: {LOG_FILE}")
    print("=" * 
Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
interactive function · python · L285-L335 (51 LOC)
dex-bridge.py
def interactive(model=DEFAULT_MODEL, use_raw=False, include_external=False,
                top_k=TOP_K, verbose=False):
    print()
    print("=" * 60)
    print(" DEX JR RAG BRIDGE - Interactive Mode v1.2")
    print(f" Model: {model} | Collection: {'archive' if use_raw else 'canon'}")
    ext_label = " + external" if include_external else ""
    print(f" Top-K: {top_k}{ext_label} | Type 'quit' to exit")
    print("=" * 60)
    print()

    while True:
        try:
            query = input(">>> ").strip()
        except (EOFError, KeyboardInterrupt):
            print("\nExiting.")
            break

        if not query:
            continue
        if query.lower() in ("quit", "exit", "/bye"):
            print("Exiting.")
            break

        # Inline flags
        current_raw      = use_raw
        current_external = include_external
        if query.startswith("--raw "):
            current_raw = True
            query = query[6:].strip()
        if query.startswith("--ex
main function · python · L340-L390 (51 LOC)
dex-bridge.py
def main():
    parser = argparse.ArgumentParser(description="Dex Jr RAG Bridge v1.2 - Weighted Query + Generate + Auto-Ingest")
    parser.add_argument("query",         nargs="?", default=None, help="Question to ask")
    parser.add_argument("--raw",         action="store_true", help="Search archive instead of canon (unweighted)")
    parser.add_argument("--external",    action="store_true", help="Include ext_canon and ext_archive in search")
    parser.add_argument("--model",       default=DEFAULT_MODEL, help="Ollama model to use")
    parser.add_argument("--top",         type=int, default=TOP_K, help="Number of chunks to retrieve")
    parser.add_argument("--verbose",     action="store_true", help="Show retrieved chunks with scores")
    parser.add_argument("--interactive", action="store_true", help="Interactive mode")
    parser.add_argument("--node",        default="local", help="Inference node: local or laptop")
    parser.add_argument("--no-ingest",   action="store_true", help="
source_header function · python · L66-L73 (8 LOC)
dex-convert.py
def source_header(source_path: str, file_type: str, converted_date: str) -> str:
    return (
        f"SOURCE: {source_path}\n"
        f"TYPE: {file_type}\n"
        f"CONVERTED: {converted_date}\n"
        f"CONVERTED_BY: dex-convert.py v1.0\n"
        f"{'='*60}\n\n"
    )
clean_text function · python · L75-L80 (6 LOC)
dex-convert.py
def clean_text(text: str) -> str:
    """Collapse whitespace, remove null bytes."""
    text = text.replace("\x00", "")
    text = re.sub(r'\n{4,}', '\n\n\n', text)
    text = re.sub(r' {3,}', '  ', text)
    return text.strip()
safe_filename function · python · L82-L84 (3 LOC)
dex-convert.py
def safe_filename(name: str) -> str:
    """Convert to safe filename."""
    return re.sub(r'[^\w\-_.]', '_', name)[:80]
write_output function · python · L89-L95 (7 LOC)
dex-convert.py
def write_output(content: str, out_path: Path, label: str):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    with open(out_path, "w", encoding="utf-8", errors="replace") as f:
        f.write(content)
    size = out_path.stat().st_size
    print(f"  [OK] {label}")
    print(f"       → {out_path.name}  ({size/1024:.1f} KB)")
convert_html function · python · L99-L128 (30 LOC)
dex-convert.py
def convert_html(file_path: Path, out_dir: Path, chunk_size: int = 0) -> list[Path]:
    """Strip HTML to clean text. Optionally chunk large files."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    header = source_header(str(file_path), "html", converted_date)

    if BS4_AVAILABLE:
        with open(file_path, encoding="utf-8", errors="replace") as f:
            soup = BeautifulSoup(f, "html.parser")
        for tag in soup(["script", "style", "nav", "footer", "head"]):
            tag.decompose()
        text = soup.get_text(separator="\n", strip=True)
    else:
        # Fallback: regex strip
        with open(file_path, encoding="utf-8", errors="replace") as f:
            raw = f.read()
        text = re.sub(r'<[^>]+>', ' ', raw)
        text = re.sub(r'&nbsp;', ' ', text)
        text = re.sub(r'&amp;', '&', text)
        text = re.sub(r'&lt;', '<', text)
        text = re.sub(r'&gt;', '>', text)

    text = clean_text(text)
    full_content = header + text

    if
detect_reddit_type function · python · L132-L138 (7 LOC)
dex-convert.py
def detect_reddit_type(filename: str) -> str:
    """Detect Reddit CSV type from filename."""
    name = filename.lower()
    for key in REDDIT_CSV_SCHEMAS:
        if key.replace("_", "") in name.replace("_", "").replace("-", ""):
            return key
    return "generic"
Repobility (the analyzer behind this table) · https://repobility.com
convert_reddit_csv function · python · L140-L198 (59 LOC)
dex-convert.py
def convert_reddit_csv(file_path: Path, out_dir: Path) -> list[Path]:
    """Convert Reddit CSV export to readable text."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    reddit_type    = detect_reddit_type(file_path.stem)
    header         = source_header(str(file_path), f"reddit-csv-{reddit_type}", converted_date)

    lines = []
    try:
        with open(file_path, encoding="utf-8", errors="replace", newline="") as f:
            reader = csv.DictReader(f)
            for i, row in enumerate(reader):
                entry_lines = [f"--- Entry {i+1} ---"]

                # Date
                date = row.get("date", row.get("Date", ""))
                if date:
                    entry_lines.append(f"Date: {date}")

                # Subreddit
                sub = row.get("subreddit", row.get("Subreddit", ""))
                if sub:
                    entry_lines.append(f"Subreddit: r/{sub}")

                # Title (posts)
                title = row.get("title
convert_csv_generic function · python · L200-L222 (23 LOC)
dex-convert.py
def convert_csv_generic(file_path: Path, out_dir: Path) -> list[Path]:
    """Convert any CSV to readable text format."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    header = source_header(str(file_path), "csv", converted_date)

    lines = []
    try:
        with open(file_path, encoding="utf-8", errors="replace", newline="") as f:
            reader = csv.DictReader(f)
            for i, row in enumerate(reader):
                entry = f"--- Row {i+1} ---\n"
                for key, val in row.items():
                    if val and val.strip():
                        entry += f"{key}: {val.strip()}\n"
                lines.append(entry)
    except Exception as e:
        print(f"  [WARN] CSV parse error: {e}")
        return []

    content = header + "\n".join(lines)
    out_path = out_dir / f"{file_path.stem}_converted.txt"
    write_output(content, out_path, file_path.name)
    return [out_path]
convert_json function · python · L226-L269 (44 LOC)
dex-convert.py
def convert_json(file_path: Path, out_dir: Path, chunk_size: int = 0) -> list[Path]:
    """Convert JSON to readable text. Handles Chrome history and generic JSON."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    header = source_header(str(file_path), "json", converted_date)

    try:
        with open(file_path, encoding="utf-8", errors="replace") as f:
            data = json.load(f)
    except Exception as e:
        print(f"  [WARN] JSON parse error: {e}")
        return []

    lines = []

    # Chrome history detection
    if isinstance(data, dict) and "Browser History" in data:
        items = data["Browser History"]
        lines.append(f"GOOGLE CHROME HISTORY — {len(items)} entries\n")
        for item in items:
            title = item.get("title", "")
            url   = item.get("url", "")
            ts    = item.get("time_usec", "")
            if ts:
                try:
                    dt = datetime.fromtimestamp(int(ts) / 1_000_000)
                 
page 1 / 4next ›