Function bodies 161 total
read_sample function · python · L56-L66 (11 LOC)audit_ddl_ingest_xref.py
def read_sample(full_path, max_chars=1800):
"""Read a representative chunk: skip first 200 chars (often boilerplate), take next chunk."""
try:
with open(full_path, "r", encoding="utf-8", errors="replace") as f:
txt = f.read()
# Take a middle-ish slice — avoids title/header-only matching
if len(txt) > 3000:
return txt[800:800 + max_chars]
return txt[:max_chars]
except Exception as e:
return Nonemetadata_filename_hits function · python · L69-L92 (24 LOC)audit_ddl_ingest_xref.py
def metadata_filename_hits(client, coll_name, stem):
"""Check if any chunk metadata has a filename containing this stem.
Tries filename and source_file fields. Chroma .get() with where= supports equality;
we use $contains via a fetch-and-scan fallback on a small sample if needed."""
try:
coll = client.get_collection(coll_name)
except Exception:
return None # collection missing
hits = 0
# Try exact filename match first
for field in ("filename", "source_file", "source"):
try:
r = coll.get(where={field: stem}, limit=5, include=["metadatas"])
if r and r.get("ids"):
hits += len(r["ids"])
except Exception:
pass
# If no exact hits, try a peek-scan for substring (bounded)
if hits == 0:
try:
peek = coll.get(limit=0) # just to confirm access
except Exception:
return 0
return hitssemantic_hits function · python · L95-L116 (22 LOC)audit_ddl_ingest_xref.py
def semantic_hits(client, coll_name, text_sample, top_k=3, distance_threshold=0.6):
"""Run a vector query. Return (count_below_threshold, best_distance)."""
try:
coll = client.get_collection(coll_name)
except Exception:
return None, None
try:
emb = embed(text_sample)
r = coll.query(
query_embeddings=[emb],
n_results=top_k,
include=["distances", "metadatas"],
)
dists = r["distances"][0] if r.get("distances") else []
metas = r["metadatas"][0] if r.get("metadatas") else []
if not dists:
return 0, None
best = min(dists)
count_near = sum(1 for d in dists if d < distance_threshold)
return count_near, (best, metas[0] if metas else {})
except Exception as e:
return None, f"ERR: {e}"main function · python · L119-L189 (71 LOC)audit_ddl_ingest_xref.py
def main():
print("=" * 80)
print("DDL_INGEST -> DEX-RAG CORPUS CROSS-REFERENCE (READ-ONLY)")
print("=" * 80)
client = get_client()
# list collections present
try:
present = [c.name for c in client.list_collections()]
except Exception as e:
print(f"FATAL: can't list collections: {e}")
return
print(f"\nCollections present: {present}")
print(f"Expected from COLLECTIONS dict: {COLLS}")
missing = [c for c in COLLS if c not in present]
if missing:
print(f"MISSING: {missing}")
print()
# Content-based clusters
for cluster, rel_paths in CLUSTERS.items():
print("-" * 80)
print(f"CLUSTER: {cluster}")
for rel in rel_paths:
full = os.path.join(INGEST_ROOT, rel)
stem = os.path.basename(rel)
print(f"\n FILE: {stem}")
if not os.path.exists(full):
print(f" [missing on disk: {full}]")
continue
samget_collection function · python · L55-L61 (7 LOC)dex-acquire.py
def get_collection(name: str):
client = chromadb.PersistentClient(path=CHROMA_PATH)
try:
return client.get_or_create_collection(name=name)
except Exception as e:
print(f" [ERROR] ChromaDB collection '{name}': {e}")
sys.exit(1)fetch_url function · python · L65-L81 (17 LOC)dex-acquire.py
def fetch_url(url: str) -> tuple[str, int]:
"""Fetch URL, strip HTML to plain text. Returns (text, char_count)."""
headers = {"User-Agent": "Mozilla/5.0 (DexJr RAG Acquisition; DDL Research)"}
try:
response = requests.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
# Remove nav, footer, scripts, styles
for tag in soup(["script", "style", "nav", "footer", "header", "aside"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
# Collapse excessive whitespace
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r' {2,}', ' ', text)
return text.strip(), len(text)
except requests.RequestException as e:
return "", 0evaluate_content function · python · L109-L136 (28 LOC)dex-acquire.py
def evaluate_content(url: str, text: str, topic: str) -> dict:
"""Ask Dex Jr. to evaluate content quality and relevance."""
if not text:
return {
"overall": 0, "decision": "SKIP",
"reason": "Empty content — page may require JavaScript or blocked fetch.",
"relevance": 0, "quality": 0, "redundancy": "unknown", "usability": 0
}
preview = text[:3000]
prompt = EVAL_PROMPT_TEMPLATE.format(
topic=topic, content_preview=preview, url=url
)
try:
response = ollama.chat(
model=EVAL_MODEL,
messages=[{"role": "user", "content": prompt}],
options={"temperature": 0.1}
)
raw = response["message"]["content"]
return parse_evaluation(raw)
except Exception as e:
return {
"overall": 0, "decision": "FLAG",
"reason": f"Evaluation failed: {e}",
"relevance": 0, "quality": 0, "redundancy": "unknown", "usability": Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
parse_evaluation function · python · L138-L172 (35 LOC)dex-acquire.py
def parse_evaluation(raw: str) -> dict:
"""Parse Dex Jr.'s structured evaluation response."""
result = {
"relevance": 0, "quality": 0, "redundancy": "unknown",
"usability": 0, "overall": 0, "decision": "FLAG", "reason": ""
}
try:
for line in raw.strip().splitlines():
if line.startswith("RELEVANCE:"):
result["relevance"] = float(line.split(":")[1].strip())
elif line.startswith("QUALITY:"):
result["quality"] = float(line.split(":")[1].strip())
elif line.startswith("REDUNDANCY:"):
result["redundancy"] = line.split(":")[1].strip().lower()
elif line.startswith("USABILITY:"):
result["usability"] = float(line.split(":")[1].strip())
elif line.startswith("OVERALL:"):
result["overall"] = float(line.split(":")[1].strip())
elif line.startswith("DECISION:"):
result["decision"] = line.split(":")build_header function · python · L176-L187 (12 LOC)dex-acquire.py
def build_header(url: str, topic: str, score: float, fetch_date: str) -> str:
"""Standard source attribution header for every ingested document."""
domain = urlparse(url).netloc
return (
f"SOURCE: {url}\n"
f"DOMAIN: {domain}\n"
f"TOPIC: {topic}\n"
f"QUALITY_SCORE: {score}/10\n"
f"FETCH_DATE: {fetch_date}\n"
f"INGESTED_BY: dex-acquire.py v1.0\n"
f"{'='*60}\n\n"
)chunk_text function · python · L189-L197 (9 LOC)dex-acquire.py
def chunk_text(text: str) -> list[str]:
"""Chunk text by character count with overlap."""
chunks = []
start = 0
while start < len(text):
end = start + CHUNK_SIZE
chunks.append(text[start:end])
start += CHUNK_SIZE - CHUNK_OVERLAP
return chunksingest_to_collection function · python · L199-L252 (54 LOC)dex-acquire.py
def ingest_to_collection(url: str, text: str, topic: str,
score: float, collection_name: str) -> int:
"""Ingest content into specified ChromaDB collection. Returns chunk count."""
fetch_date = datetime.now().strftime("%Y-%m-%d")
header = build_header(url, topic, score, fetch_date)
full_text = header + text
chunks = chunk_text(full_text)
if not chunks:
return 0
collection = get_collection(collection_name)
domain = urlparse(url).netloc
url_hash = hashlib.sha256(url.encode()).hexdigest()[:12]
ids, embeddings, metadatas, documents = [], [], [], []
for i, chunk in enumerate(chunks):
chunk_id = f"acq_{url_hash}_{i:04d}"
try:
emb_response = ollama.embeddings(model=EMBED_MODEL, prompt=chunk)
embedding = emb_response["embedding"]
except Exception as e:
print(f" [WARN] Embedding failed for chunk {i}: {e}")
continue
ids.append(chunk_save_to_folder function · python · L256-L273 (18 LOC)dex-acquire.py
def save_to_folder(url: str, text: str, topic: str, score: float,
subfolder: str, base_dir: str) -> str:
"""Save acquired content as .txt file. Returns path."""
domain = urlparse(url).netloc.replace(".", "_")
path_slug = urlparse(url).path.strip("/").replace("/", "_")[:40]
filename = f"{domain}_{path_slug}.txt" if path_slug else f"{domain}.txt"
folder = Path(base_dir) / subfolder
folder.mkdir(parents=True, exist_ok=True)
filepath = folder / filename
fetch_date = datetime.now().strftime("%Y-%m-%d")
header = build_header(url, topic, score, fetch_date)
with open(filepath, "w", encoding="utf-8") as f:
f.write(header + text)
return str(filepath)log_result function · python · L277-L279 (3 LOC)dex-acquire.py
def log_result(entry: dict):
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")print_report function · python · L283-L331 (49 LOC)dex-acquire.py
def print_report(results: list[dict], topic: str, auto_ingest: bool, collection: str):
ingested = [r for r in results if r["action"] == "INGESTED"]
flagged = [r for r in results if r["action"] == "FLAGGED"]
skipped = [r for r in results if r["action"] == "SKIPPED"]
failed = [r for r in results if r["action"] == "FAILED"]
total_chunks = sum(r.get("chunks_added", 0) for r in ingested)
print("\n" + "="*60)
print(" DEX JR. ACQUISITION REPORT")
print(f" Topic: {topic}")
print(f" Mode: {'AUTO-INGEST' if auto_ingest else 'REVIEW ONLY'}")
print(f" Collection: {collection}")
print(f" Run: {datetime.now().strftime('%Y-%m-%d %H:%M')}")
print("="*60)
print(f"\n URLS PROCESSED: {len(results)}")
print(f" INGESTED: {len(ingested)} ({total_chunks} chunks added)")
print(f" FLAGGED: {len(flagged)} (needs review)")
print(f" SKIPPED: {len(skipped)}")
print(f" FAILED: {len(failed)}")
if ingested:
printload_urls function · python · L335-L351 (17 LOC)dex-acquire.py
def load_urls(source: str) -> list[str]:
"""Load URLs from file (one per line) or JSON plan."""
path = Path(source)
if not path.exists():
print(f"[ERROR] File not found: {source}")
sys.exit(1)
if source.endswith(".json"):
with open(source) as f:
plan = json.load(f)
# Support {"urls": [...]} or [{"url": ..., "topic": ...}]
if isinstance(plan, list):
return [item["url"] if isinstance(item, dict) else item for item in plan]
return plan.get("urls", [])
with open(source) as f:
return [line.strip() for line in f if line.strip() and not line.startswith("#")]Repobility (the analyzer behind this table) · https://repobility.com
main function · python · L355-L484 (130 LOC)dex-acquire.py
def main():
parser = argparse.ArgumentParser(
description="Batch URL acquisition with Dex Jr. quality evaluation"
)
parser.add_argument("--topic", required=False, help="Topic for relevance evaluation")
parser.add_argument("--urls", help="File of URLs (one per line)")
parser.add_argument("--from-plan", help="JSON acquisition plan")
parser.add_argument("--auto-ingest", action="store_true",
help="Automatically ingest content scoring 7+/10")
parser.add_argument("--review-only", action="store_true",
help="Evaluate only, do not ingest (default)")
parser.add_argument("--collection", default="ext_archive",
help="ChromaDB collection (default: ext_archive)")
parser.add_argument("--save-dir", default="acquisitions",
help="Base directory to save flagged/reviewed files")
args = parser.parse_args()
# Validate
if not args.urls and RestoreTestFailedError class · python · L50-L61 (12 LOC)dex-backup.py
class RestoreTestFailedError(Exception):
"""
Raised by restore_test() when a backup fails post-creation verification.
Trigger 6 of STD-DDL-BACKUP-001 (pending formalization). The backup
directory that failed the restore test is NOT automatically renamed
or deleted — the operator inspects it manually. The exception message
contains the full diff (collection counts, missing collections, etc.).
"""
def __init__(self, message: str, result: dict | None = None):
super().__init__(message)
self.result = result__init__ method · python · L59-L61 (3 LOC)dex-backup.py
def __init__(self, message: str, result: dict | None = None):
super().__init__(message)
self.result = resultcleanup_stale_scratch function · python · L72-L105 (34 LOC)dex-backup.py
def cleanup_stale_scratch(max_age_hours: float = 1.0) -> int:
"""
Reclaim orphan restore_test_* directories in dex-rag-scratch/ that
are older than max_age_hours.
Exists because restore_test()'s in-process cleanup can fail on
Windows when ChromaDB's HNSW data_level0.bin is still memory-mapped
after `del client` + `gc.collect()`. The mmap is released when the
Python process exits, so the *next* dex-backup.py run can safely
remove what the previous run left behind.
Tolerates file-lock errors by logging a WARN and continuing.
Returns the count of directories actually removed.
"""
scratch_root = Path(__file__).parent.parent / "dex-rag-scratch"
if not scratch_root.exists():
return 0
removed = 0
cutoff = datetime.now(timezone.utc).timestamp() - (max_age_hours * 3600)
for child in scratch_root.iterdir():
if not child.is_dir():
continue
if not child.name.startswith("restore_test_"):
find_existing_backups function · python · L108-L117 (10 LOC)dex-backup.py
def find_existing_backups() -> list[Path]:
"""Return sorted list of existing backup directories (newest first)."""
if not BACKUP_ROOT.exists():
return []
backups = [
p for p in BACKUP_ROOT.iterdir()
if p.is_dir() and p.name.startswith("chromadb_") and not p.name.endswith("_FAILED")
]
backups.sort(key=lambda p: p.name, reverse=True)
return backupsget_live_chunk_count function · python · L120-L128 (9 LOC)dex-backup.py
def get_live_chunk_count() -> int:
"""Get total chunk count across all live collections."""
sys.path.insert(0, str(Path(__file__).parent))
from dex_weights import get_client
client = get_client()
total = 0
for col in client.list_collections():
total += client.get_collection(col.name).count()
return totalread_manifest function · python · L131-L139 (9 LOC)dex-backup.py
def read_manifest(backup_dir: Path) -> dict | None:
"""Read a backup's _manifest.json. Returns None if missing or invalid."""
manifest_path = backup_dir / "_manifest.json"
if not manifest_path.exists():
return None
try:
return json.loads(manifest_path.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
return Nonecheck_triggers function · python · L142-L182 (41 LOC)dex-backup.py
def check_triggers(expected_write_chunks: int = 0) -> tuple[bool, list[str]]:
"""
Check all backup triggers per STD-DDL-BACKUP-001.
Returns (should_backup, list_of_triggers_that_fired).
"""
fired = []
backups = find_existing_backups()
if not backups:
# No backups exist at all — Trigger 1 effectively fires
fired.append("no_existing_backups")
return (True, fired)
most_recent = backups[0]
manifest = read_manifest(most_recent)
if manifest is None:
fired.append("most_recent_manifest_invalid")
return (True, fired)
# Trigger 1: time-based
last_backup_at = datetime.fromisoformat(manifest["created_at"].replace("Z", "+00:00"))
age = datetime.now(timezone.utc) - last_backup_at
if age > timedelta(days=TRIGGER_DAYS):
fired.append(f"time_based_age_{age.days}d")
# Trigger 2: volume-based
try:
live_count = get_live_chunk_count()
backup_count = manifest.get("total_chunk_cRepobility — same analyzer, your code, free for public repos · /scan/
build_check_status function · python · L185-L266 (82 LOC)dex-backup.py
def build_check_status(expected_write_chunks: int = 0) -> dict:
"""
Lightweight status of the most recent backup, suitable for the
--check-only --json path consumed by ensure_backup_current().
Does NOT call validate_backup() — that function compares the backup
against current live state and would fail on any drift since the
backup was taken. This function checks existence + manifest + sqlite
readability only, plus runs check_triggers() to report what would
fire if a write happened now.
"""
result = {
"exists": False,
"most_recent": None,
"most_recent_path": None,
"manifest_valid": False,
"sqlite_present": False,
"sqlite_readable": False,
"age_hours": None,
"created_at": None,
"total_chunk_count": None,
"triggers_to_fire": [],
"should_backup": False,
"live_chunk_count": None,
"expected_write_chunks": expected_write_chunks,
}
backups = sha256_file function · python · L269-L274 (6 LOC)dex-backup.py
def sha256_file(path: Path) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()query_collection_state function · python · L277-L302 (26 LOC)dex-backup.py
def query_collection_state(sqlite_path: Path) -> dict[str, int]:
"""
Query a ChromaDB SQLite file for collection names and chunk counts.
Read-only. Returns dict of {collection_name: chunk_count}.
"""
db_uri = f"file:/{str(sqlite_path).replace(chr(92), '/')}?mode=ro"
con = sqlite3.connect(db_uri, uri=True)
cur = con.cursor()
cur.execute("SELECT id, name FROM collections")
collections = cur.fetchall()
result = {}
for col_id, col_name in collections:
# Count via embeddings table joined through segments
# Fall back to a simpler count if join is unreliable
try:
cur.execute("""
SELECT COUNT(*) FROM embeddings e
JOIN segments s ON e.segment_id = s.id
WHERE s.collection = ?
""", (col_id,))
result[col_name] = cur.fetchone()[0]
except sqlite3.OperationalError:
# Schema variant — just total embeddings, not per collection
perform_backup function · python · L305-L366 (62 LOC)dex-backup.py
def perform_backup(dry_run: bool = False) -> tuple[bool, Path | None, dict]:
"""
Run the actual backup. Returns (success, backup_path, manifest).
"""
started_at = datetime.now(timezone.utc)
timestamp = utc_now_compact()
backup_dir = BACKUP_ROOT / f"chromadb_{timestamp}"
if dry_run:
print(f"[DRY RUN] Would create backup at: {backup_dir}")
return (True, None, {})
print(f"Creating backup at: {backup_dir}")
BACKUP_ROOT.mkdir(parents=True, exist_ok=True)
backup_dir.mkdir(parents=True, exist_ok=False)
# Step 1: SQLite backup API for chroma.sqlite3
src_sqlite = LIVE_CHROMADB / "chroma.sqlite3"
dst_sqlite = backup_dir / "chroma.sqlite3"
print(f" SQLite backup: {src_sqlite.name}")
src_con = sqlite3.connect(str(src_sqlite))
dst_con = sqlite3.connect(str(dst_sqlite))
src_con.backup(dst_con)
src_con.close()
dst_con.close()
# Step 2: shutil.copytree for each UUID segment directory
for item in LIvalidate_backup function · python · L369-L429 (61 LOC)dex-backup.py
def validate_backup(backup_dir: Path, manifest: dict) -> tuple[bool, list[str]]:
"""Validate a backup against STD-DDL-BACKUP-001 §"Validation rules"."""
failures = []
sqlite_path = backup_dir / "chroma.sqlite3"
if not sqlite_path.exists():
failures.append("chroma.sqlite3 missing in backup")
return (False, failures)
src_size = (LIVE_CHROMADB / "chroma.sqlite3").stat().st_size
dst_size = sqlite_path.stat().st_size
if abs(dst_size - src_size) / src_size > 0.05:
failures.append(f"size delta >5%: src={src_size}, dst={dst_size}")
try:
db_uri = f"file:/{str(sqlite_path).replace(chr(92), '/')}?mode=ro"
con = sqlite3.connect(db_uri, uri=True)
con.close()
except Exception as e:
failures.append(f"backup sqlite won't open read-only: {e}")
src_state = query_collection_state(LIVE_CHROMADB / "chroma.sqlite3")
dst_state = manifest["collections"]
if set(src_state.keys()) != set(dst_state.keys()):restore_test function · python · L432-L614 (183 LOC)dex-backup.py
def restore_test(backup_path: "Path | None" = None) -> dict:
"""
Trigger 6 — post-backup restore verification.
Copies a backup to a scratch location outside the repo and outside
OneDrive, opens it as a fresh ChromaDB PersistentClient, enumerates
all collections, counts each one, and compares the result to the
manifest's recorded counts. Any mismatch raises
RestoreTestFailedError with full diff detail.
The original backup directory is never opened or modified — only
the scratch copy is touched. Scratch is always cleaned up via a
try/finally, even on failure. On Windows, a GC hint + one retry
with a small delay handles the SQLite file-lock case.
Args:
backup_path: path to a backup directory. If None, uses the
most recent backup from find_existing_backups().
Returns:
dict with test result:
backup_tested: backup dir name
scratch_path: scratch location (deleted by the time this returnrotate_backups function · python · L617-L659 (43 LOC)dex-backup.py
def rotate_backups() -> tuple[bool, list[str]]:
"""Apply retention policy. Returns (success, list_of_pruned_paths)."""
backups = find_existing_backups()
if len(backups) <= RETAIN_DAILY:
return (True, [])
keep = set()
# Keep last 7 daily
for b in backups[:RETAIN_DAILY]:
keep.add(b.name)
# Keep last 4 weekly (one per week from older backups)
weeks_kept = set()
for b in backups[RETAIN_DAILY:]:
manifest = read_manifest(b)
if not manifest:
continue
created = datetime.fromisoformat(manifest["created_at"].replace("Z", "+00:00"))
week_key = created.strftime("%Y-W%U")
if week_key not in weeks_kept and len(weeks_kept) < RETAIN_WEEKLY:
weeks_kept.add(week_key)
keep.add(b.name)
# Keep last 3 monthly
months_kept = set()
for b in backups[RETAIN_DAILY:]:
manifest = read_manifest(b)
if not manifest:
continue
created = datetiappend_log function · python · L662-L665 (4 LOC)dex-backup.py
def append_log(entry: dict) -> None:
BACKUP_ROOT.mkdir(parents=True, exist_ok=True)
with open(BACKUP_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")Powered by Repobility — scan your code at https://repobility.com
main function · python · L668-L854 (187 LOC)dex-backup.py
def main():
parser = argparse.ArgumentParser(description="ChromaDB backup per STD-DDL-BACKUP-001")
parser.add_argument("--force", action="store_true", help="Backup regardless of triggers")
parser.add_argument("--dry-run", action="store_true", help="Check triggers, report, no copy")
parser.add_argument("--rotate-only", action="store_true", help="Skip backup, just rotate")
parser.add_argument("--check-only", action="store_true", help="Validate most recent backup")
parser.add_argument("--json", action="store_true", help="Output structured JSON (only with --check-only)")
parser.add_argument("--restore-test", action="store_true", help="Run Trigger 6 restore test on most recent backup")
parser.add_argument("--skip-restore-test", action="store_true", help="Skip Trigger 6 restore test after backup creation")
parser.add_argument("--expected-chunks", type=int, default=0, help="For pre-batch trigger")
args = parser.parse_args()
# Suppress banner in --cget_embedding function · python · L74-L85 (12 LOC)dex-bridge.py
def get_embedding(text):
try:
r = requests.post(
OLLAMA_EMBED_URL,
json={"model": EMBED_MODEL, "prompt": text},
timeout=60,
)
r.raise_for_status()
return r.json().get("embedding")
except Exception as e:
print(f" [ERROR] Embedding failed: {e}")
return Noneretrieve function · python · L90-L140 (51 LOC)dex-bridge.py
def retrieve(query, top_k=TOP_K, use_raw=False, include_external=False):
"""
Returns (chunks, provenance_string).
chunks: list of dicts with text, source, distance, label, weighted_score
provenance: e.g. "[Sources: 3xCanon | 1xArchive | 1xExtCanon]"
"""
if use_raw:
# Legacy unweighted path - single collection
embedding = get_embedding(query)
if not embedding:
return [], "[Sources: none]"
client = get_client()
collection = client.get_collection(RAW_COLLECTION)
results = collection.query(
query_embeddings=[embedding],
n_results=top_k,
include=["documents", "metadatas", "distances"],
)
chunks = []
if results and results["documents"]:
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i] if results["metadatas"] else {}
dist = results["distances"][0][i] if results["distances"]build_context function · python · L145-L158 (14 LOC)dex-bridge.py
def build_context(chunks, provenance, max_chars=MAX_CONTEXT_CHARS):
context_parts = [provenance, ""]
total_chars = len(provenance)
for i, chunk in enumerate(chunks):
label = chunk.get("label", "")
score = f" score={chunk['weighted_score']:.4f}" if chunk.get("weighted_score") else ""
entry = f"[Source {i+1}: {chunk['source']} | {label}{score}]\n{chunk['text']}\n"
if total_chars + len(entry) > max_chars:
break
context_parts.append(entry)
total_chars += len(entry)
return "\n".join(context_parts)generate function · python · L163-L191 (29 LOC)dex-bridge.py
def generate(query, context, model=DEFAULT_MODEL, chat_url=OLLAMA_CHAT_URL):
prompt = f"""The following context was retrieved from the DDL knowledge base to help answer the question. Use this context to inform your answer. If the context does not contain relevant information, say so.
RETRIEVED CONTEXT:
{context}
QUESTION:
{query}
Answer based on the retrieved context and your governance training. Cite sources by number when referencing specific retrieved documents."""
try:
r = requests.post(
chat_url,
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"temperature": 0.3,
"num_ctx": 8192,
},
},
timeout=120,
)
r.raise_for_status()
return r.json().get("response", "[No response]")
except Exception as e:
return f"[ERROR] Generation failed: {e}"auto_ingest function · python · L196-L220 (25 LOC)dex-bridge.py
def auto_ingest(query, response, provenance, sources):
"""Write query+response transcript to bridge-ingest folder and trigger fast canon ingest."""
try:
os.makedirs(BRIDGE_INGEST_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"bridge_{ts}.txt"
filepath = os.path.join(BRIDGE_INGEST_DIR, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"DEX JR RAG BRIDGE - Query Transcript\n")
f.write(f"{'='*60}\n")
f.write(f"TIMESTAMP: {datetime.datetime.now().isoformat()}\n")
f.write(f"PROVENANCE: {provenance}\n")
f.write(f"SOURCES: {', '.join(sources)}\n")
f.write(f"{'='*60}\n\n")
f.write(f"QUERY:\n{query}\n\n")
f.write(f"RESPONSE:\n{response}\n")
if os.path.exists(INGEST_SCRIPT):
subprocess.run(
["python", INGEST_SCRIPT, "--path", BRIDGE_INGEST_DIR, "--build-canonlog_interaction function · python · L225-L245 (21 LOC)dex-bridge.py
def log_interaction(query, chunks, provenance, response, model, use_raw, include_external):
entry = {
"timestamp": datetime.datetime.now().isoformat(),
"query": query,
"model": model,
"collection": RAW_COLLECTION if use_raw else CANON_COLLECTION,
"include_external": include_external,
"provenance": provenance,
"chunks_retrieved": len(chunks),
"sources": [c["source"] for c in chunks],
"labels": [c.get("label", "") for c in chunks],
"weighted_scores": [c.get("weighted_score") for c in chunks],
"distances": [c["distance"] for c in chunks],
"response_length": len(response),
"response_preview": response[:200],
}
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except Exception as e:
print(f" [WARN] Logging failed: {e}")display_results function · python · L250-L280 (31 LOC)dex-bridge.py
def display_results(query, chunks, provenance, response, verbose=False):
print()
print("=" * 60)
print(f" QUERY: {query}")
print(f" {provenance}")
print("=" * 60)
print()
if verbose:
print("-" * 60)
print(" RETRIEVED CONTEXT:")
print("-" * 60)
for i, chunk in enumerate(chunks):
dist = f"{chunk['distance']:.4f}" if chunk['distance'] is not None else "?"
score = f" weighted={chunk['weighted_score']:.4f}" if chunk.get("weighted_score") else ""
label = chunk.get("label", "")
print(f"\n [{i+1}] [{label}] distance={dist}{score}")
print(f" source: {chunk['source']}")
print(f" {chunk['text'][:200]}...")
print()
print("-" * 60)
print(" ANSWER:")
print("-" * 60)
print()
print(response)
print()
print("-" * 60)
print(f" Sources: {', '.join(c['source'] for c in chunks)}")
print(f" Log: {LOG_FILE}")
print("=" * Want fix-PRs on findings? Install Repobility's GitHub App · github.com/apps/repobility-bot
interactive function · python · L285-L335 (51 LOC)dex-bridge.py
def interactive(model=DEFAULT_MODEL, use_raw=False, include_external=False,
top_k=TOP_K, verbose=False):
print()
print("=" * 60)
print(" DEX JR RAG BRIDGE - Interactive Mode v1.2")
print(f" Model: {model} | Collection: {'archive' if use_raw else 'canon'}")
ext_label = " + external" if include_external else ""
print(f" Top-K: {top_k}{ext_label} | Type 'quit' to exit")
print("=" * 60)
print()
while True:
try:
query = input(">>> ").strip()
except (EOFError, KeyboardInterrupt):
print("\nExiting.")
break
if not query:
continue
if query.lower() in ("quit", "exit", "/bye"):
print("Exiting.")
break
# Inline flags
current_raw = use_raw
current_external = include_external
if query.startswith("--raw "):
current_raw = True
query = query[6:].strip()
if query.startswith("--exmain function · python · L340-L390 (51 LOC)dex-bridge.py
def main():
parser = argparse.ArgumentParser(description="Dex Jr RAG Bridge v1.2 - Weighted Query + Generate + Auto-Ingest")
parser.add_argument("query", nargs="?", default=None, help="Question to ask")
parser.add_argument("--raw", action="store_true", help="Search archive instead of canon (unweighted)")
parser.add_argument("--external", action="store_true", help="Include ext_canon and ext_archive in search")
parser.add_argument("--model", default=DEFAULT_MODEL, help="Ollama model to use")
parser.add_argument("--top", type=int, default=TOP_K, help="Number of chunks to retrieve")
parser.add_argument("--verbose", action="store_true", help="Show retrieved chunks with scores")
parser.add_argument("--interactive", action="store_true", help="Interactive mode")
parser.add_argument("--node", default="local", help="Inference node: local or laptop")
parser.add_argument("--no-ingest", action="store_true", help="source_header function · python · L66-L73 (8 LOC)dex-convert.py
def source_header(source_path: str, file_type: str, converted_date: str) -> str:
return (
f"SOURCE: {source_path}\n"
f"TYPE: {file_type}\n"
f"CONVERTED: {converted_date}\n"
f"CONVERTED_BY: dex-convert.py v1.0\n"
f"{'='*60}\n\n"
)clean_text function · python · L75-L80 (6 LOC)dex-convert.py
def clean_text(text: str) -> str:
"""Collapse whitespace, remove null bytes."""
text = text.replace("\x00", "")
text = re.sub(r'\n{4,}', '\n\n\n', text)
text = re.sub(r' {3,}', ' ', text)
return text.strip()safe_filename function · python · L82-L84 (3 LOC)dex-convert.py
def safe_filename(name: str) -> str:
"""Convert to safe filename."""
return re.sub(r'[^\w\-_.]', '_', name)[:80]write_output function · python · L89-L95 (7 LOC)dex-convert.py
def write_output(content: str, out_path: Path, label: str):
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8", errors="replace") as f:
f.write(content)
size = out_path.stat().st_size
print(f" [OK] {label}")
print(f" → {out_path.name} ({size/1024:.1f} KB)")convert_html function · python · L99-L128 (30 LOC)dex-convert.py
def convert_html(file_path: Path, out_dir: Path, chunk_size: int = 0) -> list[Path]:
"""Strip HTML to clean text. Optionally chunk large files."""
converted_date = datetime.now().strftime("%Y-%m-%d")
header = source_header(str(file_path), "html", converted_date)
if BS4_AVAILABLE:
with open(file_path, encoding="utf-8", errors="replace") as f:
soup = BeautifulSoup(f, "html.parser")
for tag in soup(["script", "style", "nav", "footer", "head"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
else:
# Fallback: regex strip
with open(file_path, encoding="utf-8", errors="replace") as f:
raw = f.read()
text = re.sub(r'<[^>]+>', ' ', raw)
text = re.sub(r' ', ' ', text)
text = re.sub(r'&', '&', text)
text = re.sub(r'<', '<', text)
text = re.sub(r'>', '>', text)
text = clean_text(text)
full_content = header + text
ifdetect_reddit_type function · python · L132-L138 (7 LOC)dex-convert.py
def detect_reddit_type(filename: str) -> str:
"""Detect Reddit CSV type from filename."""
name = filename.lower()
for key in REDDIT_CSV_SCHEMAS:
if key.replace("_", "") in name.replace("_", "").replace("-", ""):
return key
return "generic"Repobility (the analyzer behind this table) · https://repobility.com
convert_reddit_csv function · python · L140-L198 (59 LOC)dex-convert.py
def convert_reddit_csv(file_path: Path, out_dir: Path) -> list[Path]:
"""Convert Reddit CSV export to readable text."""
converted_date = datetime.now().strftime("%Y-%m-%d")
reddit_type = detect_reddit_type(file_path.stem)
header = source_header(str(file_path), f"reddit-csv-{reddit_type}", converted_date)
lines = []
try:
with open(file_path, encoding="utf-8", errors="replace", newline="") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
entry_lines = [f"--- Entry {i+1} ---"]
# Date
date = row.get("date", row.get("Date", ""))
if date:
entry_lines.append(f"Date: {date}")
# Subreddit
sub = row.get("subreddit", row.get("Subreddit", ""))
if sub:
entry_lines.append(f"Subreddit: r/{sub}")
# Title (posts)
title = row.get("titleconvert_csv_generic function · python · L200-L222 (23 LOC)dex-convert.py
def convert_csv_generic(file_path: Path, out_dir: Path) -> list[Path]:
"""Convert any CSV to readable text format."""
converted_date = datetime.now().strftime("%Y-%m-%d")
header = source_header(str(file_path), "csv", converted_date)
lines = []
try:
with open(file_path, encoding="utf-8", errors="replace", newline="") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
entry = f"--- Row {i+1} ---\n"
for key, val in row.items():
if val and val.strip():
entry += f"{key}: {val.strip()}\n"
lines.append(entry)
except Exception as e:
print(f" [WARN] CSV parse error: {e}")
return []
content = header + "\n".join(lines)
out_path = out_dir / f"{file_path.stem}_converted.txt"
write_output(content, out_path, file_path.name)
return [out_path]convert_json function · python · L226-L269 (44 LOC)dex-convert.py
def convert_json(file_path: Path, out_dir: Path, chunk_size: int = 0) -> list[Path]:
"""Convert JSON to readable text. Handles Chrome history and generic JSON."""
converted_date = datetime.now().strftime("%Y-%m-%d")
header = source_header(str(file_path), "json", converted_date)
try:
with open(file_path, encoding="utf-8", errors="replace") as f:
data = json.load(f)
except Exception as e:
print(f" [WARN] JSON parse error: {e}")
return []
lines = []
# Chrome history detection
if isinstance(data, dict) and "Browser History" in data:
items = data["Browser History"]
lines.append(f"GOOGLE CHROME HISTORY — {len(items)} entries\n")
for item in items:
title = item.get("title", "")
url = item.get("url", "")
ts = item.get("time_usec", "")
if ts:
try:
dt = datetime.fromtimestamp(int(ts) / 1_000_000)
page 1 / 4next ›