Function bodies 161 total
append_reset_log function · python · L157-L167 (11 LOC)dex-ingest.py
def append_reset_log(entry: dict) -> None:
"""
Append one JSON line to .reset_log at the repo root. Forensic-only;
failures log to stderr but do NOT block the reset (the backup is
the real recovery path, the log is a paper trail).
"""
try:
with open(RESET_LOG, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
except Exception as e:
print(f" WARN: reset log write failed (non-blocking): {e}", file=sys.stderr)get_embedding function · python · L173-L184 (12 LOC)dex-ingest.py
def get_embedding(text: str) -> Optional[List[float]]:
try:
r = requests.post(
OLLAMA_URL,
json={"model": EMBED_MODEL, "prompt": text},
timeout=60,
)
r.raise_for_status()
return r.json().get("embedding")
except Exception as e:
print(f" ? Embedding failed: {e}")
return Nonechunk_text function · python · L190-L236 (47 LOC)dex-ingest.py
def chunk_text(text: str) -> List[str]:
"""
Chunk a text string into overlapping segments.
This is intentionally simple and safe for most files.
Guardrail for extremely large files is handled before calling this.
"""
char_size = CHUNK_SIZE_TOKENS * CHARS_PER_TOKEN
char_overlap = CHUNK_OVERLAP_TOKENS * CHARS_PER_TOKEN
t = (text or "").strip()
if not t:
return []
if len(t) <= char_size:
return [t]
out: List[str] = []
start = 0
n = len(t)
while start < n:
end = min(n, start + char_size)
# Prefer breaking on paragraph/sentence boundary (best-effort)
if end < n:
para = t.rfind("\n\n", start + char_size // 2, end)
if para > start:
end = para + 2
else:
for sep in [". ", ".\n", "!\n", "?\n", "! ", "? "]:
brk = t.rfind(sep, start + char_size // 2, end)
if brk > start:
sha256_file function · python · L242-L250 (9 LOC)dex-ingest.py
def sha256_file(path: str) -> Optional[str]:
h = hashlib.sha256()
try:
with open(path, "rb") as f:
for b in iter(lambda: f.read(8192), b""):
h.update(b)
return h.hexdigest()
except Exception:
return Noneread_text_file function · python · L253-L261 (9 LOC)dex-ingest.py
def read_text_file(path: str) -> Optional[str]:
# Try a few encodings; keep it simple
for enc in ("utf-8", "utf-8-sig", "cp1252", "latin-1"):
try:
with open(path, "r", encoding=enc) as f:
return f.read()
except Exception:
continue
return Nonescan_archive function · python · L264-L287 (24 LOC)dex-ingest.py
def scan_archive(root: str, extensions: Optional[set] = None) -> List[dict]:
if extensions is None:
extensions = PHASE1_EXTENSIONS
files = []
for r, dirs, names in os.walk(root):
dirs[:] = [d for d in dirs if not d.startswith(".")]
for n in names:
if n in SKIP_FILENAMES:
continue
ext = os.path.splitext(n)[1].lower()
if ext not in extensions:
continue
full = os.path.join(r, n)
rel = os.path.relpath(full, root)
files.append(
{
"path": full,
"rel_path": rel,
"filename": n,
"extension": ext,
"folder": os.path.basename(r),
}
)
return filesingest function · python · L293-L627 (335 LOC)dex-ingest.py
def ingest(archive_path: str, reset: bool = False, build_canon: bool = False, fast: bool = False,
collection: Optional[str] = None, ext_filter: Optional[set] = None,
nominated_by: Optional[str] = None, skip_backup_check: bool = False) -> None:
import chromadb
ext_list = ", ".join(sorted(ext_filter if ext_filter else PHASE1_EXTENSIONS))
print("\n" + "=" * 60)
print(" DEX JR. RAG PIPELINE - PHASE 1 INGESTION")
print(f" Archive: {archive_path}")
if collection:
print(f" Mode: SCOPED COLLECTION -> {collection}")
else:
print(f" Mode: {'BUILD CANON' if build_canon else 'NORMAL'}")
print(f" Extensions: {ext_list}")
if nominated_by:
print(f" Nominated by: {nominated_by}")
print("=" * 60)
# Generate ingest run id (HHMMSS precision to avoid same-minute collisions)
ingest_run_id = f"manual_{datetime.now(timezone.utc).strftime('%Y-%m-%d_%H%M%S')}"
print(f" Ingest run id: {ingest_run_id}")
Open data scored by Repobility · https://repobility.com
main function · python · L630-L685 (56 LOC)dex-ingest.py
def main() -> None:
p = argparse.ArgumentParser(description="DEX JR RAG - Ingest")
p.add_argument("--path", type=str, required=True, help="Archive path")
p.add_argument("--reset", action="store_true", help="Reset collections")
p.add_argument(
"--build-canon",
action="store_true",
help="Backfill CANON only; do not expand RAW",
)
p.add_argument(
"--collection",
type=str,
default=None,
help="Route ingest to a named scoped collection (e.g. dex_code, ext_reference). Bypasses RAW/CANON routing.",
)
p.add_argument(
"--ext-filter",
nargs="+",
default=None,
help="Limit ingest to specific extensions (e.g. --ext-filter .py .cs .ts). Dot prefix required.",
)
p.add_argument(
"--nominated-by",
type=str,
default=None,
help="Tag chunks with nominating seat(s) for ext_ collections (e.g. '1002,1004').",
)
p.add_argument(
"--fast",chunk_text function · python · L40-L53 (14 LOC)dex-ingest-text.py
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE_CHARS,
overlap: int = CHUNK_OVERLAP_CHARS) -> list[str]:
"""Simple character-based chunker with overlap. Returns list of strings."""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunks.append(text[start:end])
if end >= len(text):
break
start = end - overlap
return chunksget_embedding function · python · L56-L66 (11 LOC)dex-ingest-text.py
def get_embedding(text: str) -> list[float]:
"""Generate an embedding via Ollama. Single source of truth for embedding calls."""
# Matches dex-ingest.py:45-46,105-113 and dex_weights.py:23,62-69
import requests
response = requests.post(
"http://localhost:11434/api/embeddings",
json={"model": EMBEDDING_MODEL, "prompt": text},
timeout=60,
)
response.raise_for_status()
return response.json()["embedding"]infer_source_type function · python · L69-L82 (14 LOC)dex-ingest-text.py
def infer_source_type(filename: str) -> str:
"""Infer source_type from filename per STD-DDL-METADATA-001 §"Inference rules"."""
name = filename
if name.startswith("DDLCouncilReview_"):
return "council_review"
if name.startswith("SYNTH-") or "_SYNTH." in name:
return "council_synthesis"
if any(name.startswith(p) for p in ("ADR-", "STD-", "PRO-", "CR-")):
return "governance"
if name.startswith("sweep_") and name.endswith(".md"):
return "system_telemetry"
if name.startswith("audit_") and name.endswith(".md"):
return "system_telemetry"
return "unknown"ingest_text_file function · python · L85-L201 (117 LOC)dex-ingest-text.py
def ingest_text_file(
source_path: Path,
collection_name: str,
dry_run: bool = False,
) -> dict:
"""
Ingest a single plain-text file end-to-end through the pipeline.
Returns a summary dict with keys: file, chunks_written, source_type,
collection, verified, moved_to, dry_run.
"""
# Step 1: Validate input
if not source_path.exists():
raise FileNotFoundError(f"source file does not exist: {source_path}")
if source_path.suffix.lower() not in (".txt", ".md"):
raise ValueError(
f"dex-ingest-text only handles .txt and .md, got: {source_path.suffix}"
)
# Step 2: Read file
text = source_path.read_text(encoding="utf-8", errors="replace")
if not text.strip():
raise ValueError(f"source file is empty: {source_path}")
# Step 3: Chunk
chunks = chunk_text(text)
chunk_total = len(chunks)
print(f" Chunked into {chunk_total} chunks")
# Step 3.5: Backup pre-flight (Trigger 3 of STD-Dmain function · python · L204-L227 (24 LOC)dex-ingest-text.py
def main():
parser = argparse.ArgumentParser(description="Plain-text ingest for dex-rag pipeline")
parser.add_argument("source", help="Path to .txt or .md file to ingest")
parser.add_argument("--collection", default="dex_test",
help="Target ChromaDB collection (default: dex_test)")
parser.add_argument("--dry-run", action="store_true",
help="Validate and chunk only — no embeddings, no writes, no move")
args = parser.parse_args()
source = Path(args.source).resolve()
print(f"Ingesting: {source}")
print(f"Target collection: {args.collection}")
print(f"Dry run: {args.dry_run}")
print()
summary = ingest_text_file(source, args.collection, dry_run=args.dry_run)
print()
print("─" * 60)
print("SUMMARY")
print("─" * 60)
for k, v in summary.items():
print(f" {k:20} {v}")
print()log function · python · L41-L45 (5 LOC)dex-ocr.py
def log(message: str):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {message}"
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(entry + "\n")get_image_files function · python · L48-L57 (10 LOC)dex-ocr.py
def get_image_files(directory: str) -> list[Path]:
dir_path = Path(directory)
if not dir_path.exists():
print(f"ERROR: Directory not found: {directory}")
sys.exit(1)
files = [
f for f in dir_path.iterdir()
if f.is_file() and f.suffix.lower() in IMAGE_EXTENSIONS
]
return sorted(files)All rows scored by the Repobility analyzer (https://repobility.com)
build_header function · python · L68-L78 (11 LOC)dex-ocr.py
def build_header(image_path: Path, img: Image.Image) -> str:
width, height = img.size
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return (
f"SOURCE: {image_path.name}\n"
f"FULL_PATH: {image_path.resolve()}\n"
f"DIMENSIONS: {width}x{height}px\n"
f"OCR_TIMESTAMP: {timestamp}\n"
f"TYPE: screenshot\n"
f"{'=' * 60}\n\n"
)run_ocr function · python · L81-L93 (13 LOC)dex-ocr.py
def run_ocr(image_path: Path) -> tuple[str, str | None]:
"""Returns (text, error). error is None on success."""
try:
img = Image.open(image_path)
# Convert to RGB if needed (handles RGBA, palette mode, etc.)
if img.mode not in ("RGB", "L"):
img = img.convert("RGB")
text = pytesseract.image_to_string(img, config="--psm 3")
header = build_header(image_path, img)
full_text = header + text.strip()
return full_text, None
except Exception as e:
return "", str(e)trigger_ingest function · python · L100-L108 (9 LOC)dex-ocr.py
def trigger_ingest():
print("\n Triggering canon ingestion...")
result = subprocess.run(
[sys.executable, INGEST_SCRIPT,
"--path", CANON_PATH, "--build-canon"],
capture_output=False
)
if result.returncode != 0:
print(" WARNING: Ingest returned non-zero exit code.")main function · python · L113-L205 (93 LOC)dex-ocr.py
def main():
parser = argparse.ArgumentParser(
description="dex-ocr.py — Convert screenshots to .txt for Dex Jr. RAG corpus"
)
parser.add_argument("--dir", required=True, help="Input folder of images")
parser.add_argument("--out-dir", default=DEFAULT_OUT_DIR, help="Output folder for .txt files")
parser.add_argument("--ingest", action="store_true", help="Trigger canon ingestion after conversion")
parser.add_argument("--preview", action="store_true", help="Print OCR output without saving")
parser.add_argument("--skip-processed", action="store_true", help="Skip files already converted")
args = parser.parse_args()
out_dir = Path(args.out_dir)
if not args.preview:
out_dir.mkdir(parents=True, exist_ok=True)
images = get_image_files(args.dir)
total = len(images)
if total == 0:
print(f"No image files found in: {args.dir}")
sys.exit(0)
print(f"\n dex-ocr.py — ScreenBackupFailedError class · python · L33-L35 (3 LOC)dex_pipeline.py
class BackupFailedError(Exception):
"""A backup attempt ran and failed validation, or the most recent
backup is structurally corrupt (sqlite unreadable, etc.)."""build_chunk_metadata function · python · L62-L167 (106 LOC)dex_pipeline.py
def build_chunk_metadata(
source_file: str,
source_path: str,
source_type: str,
ingest_run_id: str,
chunk_index: int,
chunk_total: int,
ingested_at: Optional[str] = None,
) -> dict:
"""
Build a validated metadata dict for a single chunk.
Per STD-DDL-METADATA-001. All seven fields are mandatory.
Validates per the standard's validation rules.
Raises ValueError on validation failure with a specific message
naming the field that failed.
Args:
source_file: Filename only, not the path. e.g. "DDLCouncilReview_AntiPractice.txt"
source_path: Full absolute path at time of ingest.
source_type: One of VALID_SOURCE_TYPES. Use "unknown" if uncertain.
ingest_run_id: e.g. "sweep_2026-04-11_0300", "manual_2026-04-11_1842",
"backfill_2026-04-11", or "test_<purpose>_2026-04-11".
chunk_index: 0-indexed position within source file.
chunk_total: total number of chunks from this soverify_ingest function · python · L170-L208 (39 LOC)dex_pipeline.py
def verify_ingest(
collection_name: str,
source_file: str,
expected_chunk_count: int,
) -> tuple[bool, int]:
"""
Verify that the expected number of chunks for a given source_file
landed in the target collection.
Read-only operation. Queries by metadata filter, no writes.
Args:
collection_name: e.g. "dex_canon", "ddl_archive"
source_file: filename to query for (matches metadata source_file)
expected_chunk_count: how many chunks the ingest path attempted to write
Returns:
Tuple of (success: bool, actual_count: int).
success is True iff actual_count == expected_chunk_count.
"""
# Use the same client pattern as dex_weights.py
# Local import to avoid hard dependency at module-load time
from dex_weights import get_client
client = get_client()
try:
collection = client.get_collection(collection_name)
except Exception as e:
# Collection doesn't exist or can't be reached — move_to_staging function · python · L211-L277 (67 LOC)dex_pipeline.py
def move_to_staging(source_path: str) -> Path:
"""
Atomically move a file from DDL_Ingest to DDL_Staging,
preserving its relative path structure within the inbox.
A file at DDL_Ingest/foo/bar/baz.txt becomes
DDL_Staging/foo/bar/baz.txt — same relative path, different root.
If the destination directory doesn't exist, it's created. If a
file already exists at the destination (collision), the function
raises FileExistsError rather than overwriting — collisions
require operator review.
Per ADR-INGEST-PIPELINE-001 §"Three folders, three states":
nothing is ever deleted. This function only moves forward.
Args:
source_path: Absolute path to a file inside DDL_Ingest.
Returns:
Path object pointing to the new location in DDL_Staging.
Raises:
ValueError: if source_path is not inside DDL_INGEST_ROOT.
FileNotFoundError: if source_path doesn't exist.
FileExistsError: if destination already exists (nIf a scraper extracted this row, it came from Repobility (https://repobility.com)
ensure_backup_current function · python · L280-L403 (124 LOC)dex_pipeline.py
def ensure_backup_current(
expected_write_chunks: int = 0,
force_check: bool = False,
dry_run: bool = False,
) -> dict:
"""
Ensure the corpus backup is current per STD-DDL-BACKUP-001 §"Trigger 3".
Trigger 3 is the pre-write gate. This helper is the in-pipeline gate
that ingest paths call before writing chunks. dex-backup.py is the
single source of truth for trigger logic and the actual copy — this
helper just shells out to it via subprocess and interprets the result.
Args:
expected_write_chunks: how many chunks the caller intends to write.
If > 100, Trigger 5 fires regardless of other triggers.
force_check: if True, run a backup regardless of trigger state.
(Interpreted as "force-run a backup," consistent with the
self-test "force_check=True → backup_ran=True" expectation.)
dry_run: if True, --dry-run is added to the dex-backup.py
invocation. Trigger logic still runs and thget_embedding function · python · L14-L21 (8 LOC)dex-query.py
def get_embedding(text: str):
r = requests.post(
OLLAMA_URL,
json={"model": EMBED_MODEL, "prompt": text},
timeout=60
)
r.raise_for_status()
return r.json()["embedding"]main function · python · L23-L122 (100 LOC)dex-query.py
def main():
parser = argparse.ArgumentParser()
parser.add_argument("query", nargs="?", help="Search query")
parser.add_argument("--top", type=int, default=DEFAULT_TOP_N)
parser.add_argument("--raw", action="store_true", help="Search RAW archive instead of canon (unweighted)")
parser.add_argument("--external", action="store_true", help="Include ext_canon and ext_archive in search")
parser.add_argument("--stats", action="store_true", help="Show DB stats")
parser.add_argument("--weight-stats", action="store_true", help="Show source weight table and exit")
args = parser.parse_args()
# Weight table — no DB needed
if args.weight_stats:
print_weight_stats()
return
# DB stats — open client here only
if args.stats:
client = chromadb.PersistentClient(path=CHROMA_DIR)
canon = client.get_collection("dex_canon").count()
archive = client.get_collection("ddl_archive").count()
print("\n================ get_embedding function · python · L71-L74 (4 LOC)dex-search-api.py
def get_embedding(text):
resp = req.post(OLLAMA_URL, json={"model": EMBED_MODEL, "prompt": text}, timeout=30)
resp.raise_for_status()
return resp.json()["embedding"]get_rag_context function · python · L76-L88 (13 LOC)dex-search-api.py
def get_rag_context(query, collection, top_n=3):
"""Pull relevant MindFrame context from the corpus."""
try:
embedding = get_embedding(query)
results = collection.query(
query_embeddings=[embedding],
n_results=top_n,
include=["documents"],
)
chunks = [doc for doc in results["documents"][0] if doc]
return "\n\n---\n\n".join(chunks)
except:
return ""ChatMessage class · python · L109-L111 (3 LOC)dex-search-api.py
class ChatMessage(BaseModel):
role: str
content: strroot function · python · L117-L120 (4 LOC)dex-search-api.py
def root():
return {"service": "DDL API", "status": "online", "version": "0.5.0",
"canon": canon.count() if canon else 0,
"archive": archive.count() if archive else 0}search function · python · L123-L133 (11 LOC)dex-search-api.py
def search(q: str = Query(..., min_length=2), top: int = Query(5, ge=1, le=20), corpus: str = Query("canon")):
collection = canon if corpus == "canon" else archive
if not collection:
return {"error": f"'{corpus}' not found", "results": []}
embedding = get_embedding(q)
results = collection.query(query_embeddings=[embedding], n_results=top, include=["documents", "metadatas", "distances"])
hits = []
for i in range(len(results["ids"][0])):
meta = results["metadatas"][0][i]
hits.append({"text": results["documents"][0][i][:500], "source": meta.get("source_file", "?"), "distance": round(results["distances"][0][i], 2)})
return {"query": q, "corpus": corpus, "count": len(hits), "results": hits}Repobility (the analyzer behind this table) · https://repobility.com
mindframe_chat function · python · L136-L173 (38 LOC)dex-search-api.py
def mindframe_chat(request: ChatRequest):
"""MindFrame calibration chat. Sends conversation to Ollama with MindFrame system prompt + RAG context."""
# Get the latest user message for RAG context
latest_user_msg = ""
for msg in reversed(request.messages):
if msg.role == "user":
latest_user_msg = msg.content
break
# Pull relevant MindFrame context from corpus
rag_context = ""
if canon and latest_user_msg:
rag_context = get_rag_context("MindFrame calibration " + latest_user_msg, canon, top_n=3)
# Build system prompt with RAG context
system_prompt = MINDFRAME_SYSTEM
if rag_context:
system_prompt += f"\n\nRELEVANT DDL CONTEXT (from the archive):\n{rag_context}\n\nUse this context to inform your questions and observations, but don't quote it directly."
# Build Ollama messages
ollama_messages = [{"role": "system", "content": system_prompt}]
for msg in request.messages:
run function · python · L12-L22 (11 LOC)dex-setup.py
def run(cmd, label):
print(f"\n{'='*60}")
print(f" {label}")
print(f"{'='*60}")
result = subprocess.run(cmd, shell=True, capture_output=True, text=True)
if result.returncode == 0:
print(f" ✓ {label} — OK")
else:
print(f" ✗ {label} — FAILED")
print(f" {result.stderr.strip()}")
return result.returncode == 0main function · python · L24-L90 (67 LOC)dex-setup.py
def main():
print("\n" + "="*60)
print(" DDL RAG PIPELINE — PHASE 1 SETUP")
print(" Dex Jr. × Archive Integration")
print("="*60)
checks = []
# 1. Check Ollama is running
checks.append(run("ollama list", "Ollama is installed and running"))
# 2. Pull embedding model
checks.append(run("ollama pull nomic-embed-text", "Pull nomic-embed-text embedding model"))
# 3. Install Python dependencies
checks.append(run(
f"{sys.executable} -m pip install chromadb requests --quiet",
"Install ChromaDB and requests"
))
# 4. Verify imports
try:
import chromadb
print(f"\n ✓ ChromaDB version: {chromadb.__version__}")
checks.append(True)
except ImportError:
print("\n ✗ ChromaDB import failed")
checks.append(False)
try:
import requests
print(f" ✓ Requests available")
checks.append(True)
except ImportError:
print(f" ✗ Requests import failed")
scan_drop_folders function · python · L90-L109 (20 LOC)dex-sweep.py
def scan_drop_folders():
"""Find all ingestible files across all drop folders."""
found = []
for folder in DROP_FOLDERS:
if not os.path.exists(folder):
# Rule 15 fix: surface missing drop folders instead of silent skip.
print(f" WARN: drop folder not found (skipping): {folder}")
continue
for filename in os.listdir(folder):
filepath = os.path.join(folder, filename)
if os.path.isfile(filepath):
ext = os.path.splitext(filename)[1].lower()
if ext in INGEST_EXTENSIONS:
found.append({
"source": filepath,
"filename": filename,
"folder": folder,
"size": os.path.getsize(filepath),
})
return foundclassify_scanned_files function · python · L112-L135 (24 LOC)dex-sweep.py
def classify_scanned_files(files):
"""
Split scanned files into (user_files, ingest_reports).
Per STD-DDL-SWEEPREPORT-001 v1.0 classification predicate:
a file is an ingest report IF AND ONLY IF all three conditions hold:
1. parent folder name is '_sweep_reports'
2. filename starts with 'ingest_report_'
3. extension is '.md'
"""
user_files = []
ingest_reports = []
for f in files:
parent_name = os.path.basename(os.path.dirname(f["source"]))
is_report = (
parent_name == "_sweep_reports"
and f["filename"].startswith("ingest_report_")
and f["filename"].endswith(".md")
)
if is_report:
ingest_reports.append(f)
else:
user_files.append(f)
return user_files, ingest_reportsfind_previous_report function · python · L138-L147 (10 LOC)dex-sweep.py
def find_previous_report():
"""Find the most recent ingest_report_*.md in SWEEP_REPORTS_DIR."""
if not os.path.exists(SWEEP_REPORTS_DIR):
return None
reports = sorted(
[f for f in os.listdir(SWEEP_REPORTS_DIR)
if f.startswith("ingest_report_") and f.endswith(".md")],
reverse=True,
)
return os.path.join(SWEEP_REPORTS_DIR, reports[0]) if reports else Nonewrite_sweep_report function · python · L150-L298 (149 LOC)dex-sweep.py
def write_sweep_report(
ingest_run_id,
triggered_at,
files_ingested,
skipped_files,
errors,
ingestion_ok,
backup_ran,
backup_path,
outcome,
subprocess_output="",
):
"""
Compose STD-DDL-SWEEPREPORT-001 v1.0 markdown report.
Write to SWEEP_REPORTS_DIR. Return the written path, or None on failure.
Failure is WARN-level, does NOT raise.
"""
try:
os.makedirs(SWEEP_REPORTS_DIR, exist_ok=True)
now = datetime.datetime.now(datetime.timezone.utc)
ts = now.strftime("%Y-%m-%d_%H%M%S")
us = f"{now.microsecond:06d}"
fname = f"ingest_report_{ts}.{us}_{ingest_run_id}.md"
path = os.path.join(SWEEP_REPORTS_DIR, fname)
# Pipeline state (read-only ChromaDB query)
pipeline_state = {}
try:
from dex_weights import get_client
client = get_client()
for cname in ["dex_canon", "ddl_archive", "dex_code", "ext_creator"]:
try:
copy_to_corpus function · python · L301-L342 (42 LOC)dex-sweep.py
def copy_to_corpus(files, dry_run=False, temp_dir=None):
"""
Copy files to CANON_DIR (archival) + optionally to temp_dir (for ingest).
Step 22 Fix A: files are ingested from a small temp dir containing only
the new files, not from CANON_DIR (which has 5800+ historical files).
Files also go to CANON_DIR for permanent archival. Originals move to
_processed/ per existing behavior.
"""
os.makedirs(CANON_DIR, exist_ok=True)
copied = []
for f in files:
dest = os.path.join(CANON_DIR, f["filename"])
processed_dir = os.path.join(f["folder"], "_processed")
os.makedirs(processed_dir, exist_ok=True)
processed = os.path.join(processed_dir, f["filename"])
# Handle filename conflicts in CANON_DIR
if os.path.exists(dest):
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
name, ext = os.path.splitext(f["filename"])
f["filename"] = f"{name}_{ts}{ext}"
dest = os.Open data scored by Repobility · https://repobility.com
run_ingestion function · python · L344-L390 (47 LOC)dex-sweep.py
def run_ingestion(ingest_path, dry_run=False):
"""
Trigger ingestion via subprocess to dex-ingest.py.
Step 22 Fix A: ingest_path is a temp dir containing only the new files,
NOT CANON_DIR (which has 5800+ files and would timeout). This makes
ingest fast and targeted.
Returns: (ok: bool, stderr_on_failure: str | None, stdout: str)
"""
cmd_args = ["python", INGEST_SCRIPT, "--path", str(ingest_path),
"--collection", "dex_canon", "--fast", "--skip-backup-check"]
if dry_run:
print(f" [DRY RUN] Would run: {' '.join(cmd_args)}")
return (True, None, "")
if not os.path.exists(INGEST_SCRIPT):
msg = f"Ingest script not found: {INGEST_SCRIPT}"
print(f" ERROR: {msg}")
return (False, msg, "")
try:
print(f" Running ingestion against {ingest_path}...")
ingest_env = {**os.environ, "PYTHONIOENCODING": "utf-8"}
result = subprocess.run(
cmd_args,
captulog_sweep function · python · L392-L430 (39 LOC)dex-sweep.py
def log_sweep(files_found, files_copied, ingestion_ok,
start_ts=None, end_ts=None,
backup_ran=None, backup_path=None,
error=None, recovery_hint=None,
subprocess_stderr=None, dry_run=False,
outcome=None, report_written=None,
report_write_error=None):
"""
Append a JSON line to dex-sweep-log.jsonl.
All new fields default to None so old readers tolerate the schema
evolution. Log write failures are non-blocking (Rule 15 compliance:
non-silent WARN, but must not prevent sweep completion).
"""
entry = {
"timestamp": datetime.datetime.now().isoformat(),
"start_ts": start_ts,
"end_ts": end_ts,
"files_found": len(files_found),
"files_copied": len(files_copied),
"filenames": [f["filename"] for f in files_copied],
"ingestion_triggered": len(files_copied) > 0,
"ingestion_success": ingestion_ok,
"backup_ran": backup_rsweep function · python · L435-L612 (178 LOC)dex-sweep.py
def sweep(dry_run=False):
"""
Run one sweep cycle per STD-DDL-SWEEPREPORT-001 v1.0.
Classification logic:
- If only ingest reports found (no user files): skip (no gate, no ingest, no report)
- If nothing found: skip
- Otherwise: gate → copy → ingest → report
Guarantees a log_sweep call via try/finally.
"""
start_ts = datetime.datetime.now().isoformat()
triggered_at = datetime.datetime.now(datetime.timezone.utc).isoformat()
ts = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n {'='*50}")
print(f" DEX JR AUTO-SWEEP v2.0 — {ts}")
print(f" {'='*50}")
print(f" Scanning {len(DROP_FOLDERS)} drop folder(s)...")
# State captured across try/finally
files: list = []
copied: list = []
ingestion_ok = False
backup_ran = None
backup_path = None
error = None
recovery_hint = None
subprocess_stderr = None
outcome = "no_files_found"
report_path = None
report_write_error = Nonwatch function · python · L617-L633 (17 LOC)dex-sweep.py
def watch(interval_minutes, dry_run=False):
"""Continuously watch for new files."""
print(f"\n DEX JR AUTO-SWEEP — Watch Mode")
print(f" Checking every {interval_minutes} minute(s)")
print(f" Drop folders: {len(DROP_FOLDERS)}")
for f in DROP_FOLDERS:
exists = "OK" if os.path.exists(f) else "NOT FOUND"
print(f" [{exists}] {f}")
print(f" Press Ctrl+C to stop\n")
while True:
try:
sweep(dry_run=dry_run)
time.sleep(interval_minutes * 60)
except KeyboardInterrupt:
print("\n Watch stopped.")
breakrun_self_tests function · python · L638-L700 (63 LOC)dex-sweep.py
def run_self_tests():
"""D8 self-tests per STD-DDL-SWEEPREPORT-001 classification predicate."""
print("Running self-tests...")
passed = 0
# Classification predicate: 6 cases
def make_file(source, filename):
return {"source": source, "filename": filename, "folder": os.path.dirname(source), "size": 100}
# Case 1: valid report (all 3 conditions true)
f1 = make_file(r"C:\DDL_Ingest\_sweep_reports\ingest_report_2026-04-12.md", "ingest_report_2026-04-12.md")
u, r = classify_scanned_files([f1])
assert len(r) == 1 and len(u) == 0, f"Case 1 failed: {u}, {r}"
print(" [OK] Case 1: valid report classified as report")
passed += 1
# Case 2: wrong parent folder
f2 = make_file(r"C:\DDL_Ingest\ingest_report_2026-04-12.md", "ingest_report_2026-04-12.md")
u, r = classify_scanned_files([f2])
assert len(u) == 1 and len(r) == 0, f"Case 2 failed"
print(" [OK] Case 2: wrong parent → user file")
passed += 1
# Case 3: wrong prmain function · python · L703-L719 (17 LOC)dex-sweep.py
def main():
parser = argparse.ArgumentParser(description="Dex Jr Auto-Sweep v2.0")
parser.add_argument("--watch", action="store_true", help="Watch continuously")
parser.add_argument("--interval", type=int, default=DEFAULT_INTERVAL, help="Minutes between checks")
parser.add_argument("--dry-run", action="store_true", help="Preview without acting")
parser.add_argument("--self-test", action="store_true", help="Run classification + report self-tests")
args = parser.parse_args()
if args.self_test:
run_self_tests()
return
if args.watch:
watch(args.interval, dry_run=args.dry_run)
else:
sweep(dry_run=args.dry_run)collection_exists function · python · L53-L58 (6 LOC)dex_weights.py
def collection_exists(client, name: str) -> bool:
try:
client.get_collection(name)
return True
except Exception:
return Falseembed function · python · L62-L69 (8 LOC)dex_weights.py
def embed(text: str) -> list[float]:
r = requests.post(
OLLAMA_EMBED_URL,
json={"model": EMBED_MODEL, "prompt": text},
timeout=60
)
r.raise_for_status()
return r.json()["embedding"]All rows scored by the Repobility analyzer (https://repobility.com)
calculate_weight function · python · L73-L95 (23 LOC)dex_weights.py
def calculate_weight(collection_name: str, metadata: dict) -> float:
base = COLLECTIONS.get(collection_name, {}).get("base_weight", 0.65)
file_type = metadata.get("file_type", "").lower()
type_mult = FILE_TYPE_WEIGHTS.get(file_type, 1.00)
filename = metadata.get("filename", "").lower()
source = metadata.get("source_file", "").lower()
# Pattern-based type detection if file_type not set cleanly
if file_type == "" or file_type not in FILE_TYPE_WEIGHTS:
if any(x in filename for x in ["cr-llms", "cr-site", "cr-ops", "cr-infra", "cr-nuet"]):
type_mult = FILE_TYPE_WEIGHTS["council_review"]
elif any(x in filename for x in ["std-", "pro-c2d", "pro-"]):
type_mult = FILE_TYPE_WEIGHTS["protocol"]
elif any(x in source for x in ["98_threads", "thread"]):
type_mult = FILE_TYPE_WEIGHTS["thread_export"]
elif any(x in source for x in ["note", "ios_note"]):
type_mult = FILE_TYPE_WEIGHTS["iscore_result function · python · L98-L105 (8 LOC)dex_weights.py
def score_result(distance: float, weight: float) -> float:
"""
Convert L2 distance to weighted similarity score.
Uses 1/(1+distance) — works at any distance scale.
Higher score = better match.
"""
similarity = 1.0 / (1.0 + distance)
return round(similarity * weight, 6)weighted_query function · python · L109-L168 (60 LOC)dex_weights.py
def weighted_query(
query_text: str,
n_results: int = 5,
include_external: bool = False,
collections: Optional[list] = None,
where_filter: Optional[dict] = None
) -> list[dict]:
client = get_client()
query_embedding = embed(query_text)
if collections:
target_collections = collections
else:
target_collections = ["dex_canon", "ddl_archive", "ext_creator", "ext_reference", "dex_code"]
if include_external:
target_collections += ["ext_canon", "ext_archive"]
fetch_per_collection = max(n_results * 3, 15)
all_results = []
for coll_name in target_collections:
if not collection_exists(client, coll_name):
continue
try:
collection = client.get_collection(coll_name)
query_kwargs = {
"query_embeddings": [query_embedding],
"n_results": fetch_per_collection,
"include": ["documents", "metadat