← back to dropdownlogistics__dex-rag

Function bodies 161 total

All specs Real LLM only Function bodies
convert_vcf function · python · L273-L313 (41 LOC)
dex-convert.py
def convert_vcf(file_path: Path, out_dir: Path) -> list[Path]:
    """Convert VCF contacts to readable text."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    header = source_header(str(file_path), "vcf-contacts", converted_date)

    try:
        with open(file_path, encoding="utf-8", errors="replace") as f:
            raw = f.read()
    except Exception as e:
        print(f"  [WARN] VCF read error: {e}")
        return []

    contacts = []
    current = []
    for line in raw.splitlines():
        if line.startswith("BEGIN:VCARD"):
            current = []
        elif line.startswith("END:VCARD"):
            contacts.append(current)
            current = []
        else:
            current.append(line)

    lines = [f"GOOGLE CONTACTS — {len(contacts)} contacts\n"]
    for i, contact in enumerate(contacts):
        contact_lines = [f"--- Contact {i+1} ---"]
        for field in contact:
            if ":" in field:
                key, _, val = field.partition(":")
convert_mbox function · python · L317-L396 (80 LOC)
dex-convert.py
def convert_mbox(file_path: Path, out_dir: Path, max_emails: int = 0) -> list[Path]:
    """Convert MBOX to individual email text files. Groups into chunks."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    ensure_dir(out_dir)

    print(f"\n  Processing MBOX: {file_path.name}")
    print(f"  This may take a while for large files...")

    try:
        mbox = mailbox.mbox(str(file_path))
    except Exception as e:
        print(f"  [FAIL] Could not open MBOX: {e}")
        return []

    output_files = []
    batch        = []
    batch_num    = 1
    batch_size   = 500  # emails per output file
    count        = 0

    for i, message in enumerate(mbox):
        if max_emails and i >= max_emails:
            break

        try:
            date    = str(message.get("Date", ""))
            subject = str(message.get("Subject", "(no subject)"))
            sender  = str(message.get("From", ""))
            to      = str(message.get("To", ""))

            # Get text body
 
convert_facebook_messages function · python · L400-L445 (46 LOC)
dex-convert.py
def convert_facebook_messages(fb_dir: Path, out_dir: Path) -> list[Path]:
    """Convert Facebook message JSON exports to text."""
    converted_date = datetime.now().strftime("%Y-%m-%d")
    output_files   = []

    msg_dir = fb_dir / "messages"
    if not msg_dir.exists():
        msg_dir = fb_dir  # try the dir itself

    json_files = list(msg_dir.rglob("message_*.json"))
    if not json_files:
        json_files = list(msg_dir.rglob("*.json"))

    print(f"  Found {len(json_files)} Facebook message JSON files")

    for jf in json_files:
        try:
            with open(jf, encoding="utf-8", errors="replace") as f:
                data = json.load(f)
        except Exception:
            continue

        participants = data.get("participants", [])
        participant_names = [p.get("name", "?") for p in participants]
        messages = data.get("messages", [])

        lines = [
            source_header(str(jf), "facebook-messages", converted_date),
            f"CONVERSATION:
chunk_file function · python · L449-L466 (18 LOC)
dex-convert.py
def chunk_file(content: str, stem: str, out_dir: Path,
               chunk_size: int, file_type: str) -> list[Path]:
    """Split large content into chunk files."""
    chunks       = []
    total_chunks = (len(content) // chunk_size) + 1
    print(f"  Chunking {stem} → {total_chunks} files ({chunk_size/1000:.0f}K chars each)")

    for i in range(total_chunks):
        start     = i * chunk_size
        end       = min(start + chunk_size, len(content))
        chunk     = content[start:end]
        out_path  = out_dir / f"{stem}_chunk_{i+1:03d}of{total_chunks:03d}.txt"
        write_output(chunk, out_path, f"{stem} chunk {i+1}/{total_chunks}")
        chunks.append(out_path)
        if end >= len(content):
            break

    return chunks
copy_to_canon function · python · L470-L485 (16 LOC)
dex-convert.py
def copy_to_canon(files: list[Path], canon_dir: str = CANON_FOLDER):
    """Copy converted files to canon folder for next sweep/ingest."""
    canon_path = Path(canon_dir)
    if not canon_path.exists():
        print(f"  [WARN] Canon folder not found: {canon_dir}")
        return

    print(f"\n  Copying {len(files)} files to canon...")
    for f in files:
        dest = canon_path / f.name
        try:
            import shutil
            shutil.copy2(f, dest)
            print(f"  [OK] {f.name}")
        except Exception as e:
            print(f"  [WARN] Copy failed for {f.name}: {e}")
main function · python · L489-L603 (115 LOC)
dex-convert.py
def main():
    parser = argparse.ArgumentParser(description="dex-convert.py — Format converter for Dex Jr. corpus")
    parser.add_argument("--file",      help="Single file to convert")
    parser.add_argument("--dir",       help="Directory to convert")
    parser.add_argument("--ext",       help="File extension filter for --dir (e.g. html, csv)")
    parser.add_argument("--type",      help="Force conversion type: html, csv, reddit-csv, json, vcf, facebook, mbox")
    parser.add_argument("--all-csv",   action="store_true", help="Convert all CSVs in dir as Reddit exports")
    parser.add_argument("--mbox",      help="MBOX file path (Gmail)")
    parser.add_argument("--chunk",     type=int, default=0,
                        help=f"Chunk size in chars (0=no chunking, default threshold={DEFAULT_CHUNK_SIZE:,})")
    parser.add_argument("--out-dir",   default=DEFAULT_OUT_DIR, help="Output directory")
    parser.add_argument("--to-canon",  action="store_true", help="Copy results to canon fo
load_env function · python · L108-L117 (10 LOC)
dex-council.py
def load_env():
    keys = {}
    if os.path.exists(ENV_FILE):
        with open(ENV_FILE, "r") as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#") and "=" in line:
                    k, v = line.split("=", 1)
                    keys[k.strip()] = v.strip()
    return keys
Source: Repobility analyzer · https://repobility.com
retrieve_context function · python · L185-L214 (30 LOC)
dex-council.py
def retrieve_context(query, top_k=TOP_K, use_raw=False):
    try:
        import chromadb
        client = chromadb.PersistentClient(path=CHROMA_DIR)
        col_name = RAW_COLLECTION if use_raw else CANON_COLLECTION
        collection = client.get_collection(col_name)
        r = requests.post(
            OLLAMA_EMBED_URL,
            json={"model": EMBED_MODEL, "prompt": query},
            timeout=60,
        )
        r.raise_for_status()
        embedding = r.json().get("embedding")
        if not embedding:
            return ""
        results = collection.query(
            query_embeddings=[embedding],
            n_results=top_k,
            include=["documents", "metadatas"],
        )
        chunks = []
        if results and results["documents"]:
            for i, doc in enumerate(results["documents"][0]):
                meta = results["metadatas"][0][i] if results["metadatas"] else {}
                source = meta.get("source_file", "unknown")
                chunks.a
query_local function · python · L219-L240 (22 LOC)
dex-council.py
def query_local(model_id, prompt, timeout=DEFAULT_TIMEOUT):
    try:
        start = time.time()
        r = requests.post(
            OLLAMA_URL,
            json={
                "model": model_id,
                "prompt": prompt,
                "stream": False,
                "options": {"temperature": 0.3, "num_ctx": 16384},
            },
            timeout=timeout,
        )
        r.raise_for_status()
        elapsed = time.time() - start
        return {
            "response": r.json().get("response", "[No response]"),
            "elapsed": round(elapsed, 1),
            "error": None,
        }
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_gemini function · python · L242-L258 (17 LOC)
dex-council.py
def query_gemini(prompt, api_key, model_id="gemini-2.5-flash", timeout=120):
    try:
        start = time.time()
        url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={api_key}"
        r = requests.post(
            url,
            json={"contents": [{"parts": [{"text": prompt}]}]},
            headers={"Content-Type": "application/json"},
            timeout=timeout,
        )
        r.raise_for_status()
        data = r.json()
        text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "[No response]")
        elapsed = time.time() - start
        return {"response": text, "elapsed": round(elapsed, 1), "error": None}
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_openai_compatible function · python · L260-L283 (24 LOC)
dex-council.py
def query_openai_compatible(prompt, api_key, url, model_id, timeout=120):
    try:
        start = time.time()
        r = requests.post(
            url,
            json={
                "model": model_id,
                "messages": [{"role": "user", "content": prompt}],
                "temperature": 0.3,
                "max_tokens": 4096,
            },
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {api_key}",
            },
            timeout=timeout,
        )
        r.raise_for_status()
        data = r.json()
        text = data.get("choices", [{}])[0].get("message", {}).get("content", "[No response]")
        elapsed = time.time() - start
        return {"response": text, "elapsed": round(elapsed, 1), "error": None}
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_cloud function · python · L285-L292 (8 LOC)
dex-council.py
def query_cloud(model, prompt, api_keys, timeout=120):
    key = api_keys.get(model["env_key"], "")
    if not key:
        return {"response": None, "elapsed": 0, "error": f"Missing API key: {model['env_key']}"}
    if model["provider"] == "gemini":
        return query_gemini(prompt, key, model["id"], timeout)
    else:
        return query_openai_compatible(prompt, key, model["url"], model["id"], timeout)
build_governed_prompt function · python · L297-L302 (6 LOC)
dex-council.py
def build_governed_prompt(prompt, rag_context=""):
    parts = [GOVERNANCE]
    if rag_context:
        parts.append(f"\nRETRIEVED CONTEXT (from DDL knowledge base):\n{rag_context}")
    parts.append(f"\nPROMPT:\n{prompt}")
    return "\n".join(parts)
synthesize function · python · L307-L339 (33 LOC)
dex-council.py
def synthesize(prompt, responses, synthesizer=DEFAULT_SYNTHESIZER):
    response_block = ""
    for i, r in enumerate(responses):
        if r["result"]["response"]:
            response_block += f"\n{'='*60}\nMODEL {i+1}: {r['name']} [{r['provider'].upper()}]\n{'='*60}\n{r['result']['response']}\n"
        else:
            response_block += f"\n{'='*60}\nMODEL {i+1}: {r['name']} [{r['provider'].upper()}]\n{'='*60}\n[FAILED: {r['result']['error']}]\n"

    synthesis_prompt = f"""You are the synthesis engine for a DDL Hybrid AutoCouncil review.

The following prompt was sent independently to {len(responses)} AI models across local and cloud tiers.
Each model received identical governance context from the DDL architecture.
Each model responded without seeing the others' responses.

ORIGINAL PROMPT:
{prompt}

MODEL RESPONSES:
{response_block}

SYNTHESIS INSTRUCTIONS:
1. CONVERGENCE: What did all or most models agree on?
2. DIVERGENCE: Where did models disagree? Note which model said what
save_to_folder function · python · L344-L431 (88 LOC)
dex-council.py
def save_to_folder(folder, prompt, responses, synthesis, rag_context=""):
    os.makedirs(folder, exist_ok=True)

    # Prompt
    with open(os.path.join(folder, "00_prompt.txt"), "w", encoding="utf-8") as f:
        f.write(prompt)

    # RAG context
    if rag_context:
        with open(os.path.join(folder, "00_rag_context.txt"), "w", encoding="utf-8") as f:
            f.write(rag_context)

    # Individual responses
    for i, r in enumerate(responses):
        tier = "LOCAL" if r["provider"] == "local" else "CLOUD"
        safe_name = r["name"].replace(" ", "_").replace("/", "-").replace("(", "").replace(")", "")
        filename = f"{i+1:02d}_{tier}_{safe_name}.txt"
        with open(os.path.join(folder, filename), "w", encoding="utf-8") as f:
            f.write(f"MODEL: {r['name']}\n")
            f.write(f"PROVIDER: {r['provider']}\n")
            f.write(f"MODEL_ID: {r['model_id']}\n")
            f.write(f"ELAPSED: {r['result']['elapsed']}s\n")
            f.write(f"TIMESTAM
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
auto_ingest function · python · L436-L475 (40 LOC)
dex-council.py
def auto_ingest(folder):
    """Copy the full transcript to the canon folder and trigger ingestion."""
    transcript = os.path.join(folder, "99_full_transcript.txt")
    if not os.path.exists(transcript):
        print("  [WARN] No transcript to ingest.")
        return

    canon_dir = r"C:\\Users\\dexjr\\99_DexUniverseArchive\\00_Archive\\AutoCouncil-Live"
    os.makedirs(canon_dir, exist_ok=True)

    # Generate unique filename
    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    basename = os.path.basename(folder).replace(" ", "_")
    dest = os.path.join(canon_dir, f"AutoCouncil_{basename}_{ts}.txt")

    try:
        import shutil
        shutil.copy2(transcript, dest)
        print(f"  Copied transcript to corpus: {dest}")

        # Run ingestion
        ingest_script = os.path.join(SCRIPT_DIR, "dex-ingest.py")
        if os.path.exists(ingest_script):
            print("  Running corpus ingestion...")
            result = subprocess.run(
                ["python", i
resynthesize function · python · L480-L552 (73 LOC)
dex-council.py
def resynthesize(folder, synthesizer=DEFAULT_SYNTHESIZER):
    prompt_path = os.path.join(folder, "00_prompt.txt")
    if not os.path.exists(prompt_path):
        print(f"  ERROR: No prompt file found in {folder}")
        return

    with open(prompt_path, "r", encoding="utf-8") as f:
        prompt = f.read().strip()

    # Read response files
    responses_text = []
    for filename in sorted(os.listdir(folder)):
        if filename.endswith(".txt") and not filename.startswith("00") and not filename.startswith("99"):
            filepath = os.path.join(folder, filename)
            with open(filepath, "r", encoding="utf-8") as f:
                content = f.read()
            lines = content.split("\n")
            name = lines[0].replace("MODEL: ", "") if lines else "Unknown"
            provider = lines[1].replace("PROVIDER: ", "") if len(lines) > 1 else "unknown"
            body = ""
            if "RESPONSE" in content:
                parts = content.split("RESPONSE\n" + "=" *
display_header function · python · L557-L576 (20 LOC)
dex-council.py
def display_header(prompt, local_models, cloud_models, synthesizer, use_rag, save_path, ingest):
    total = len(local_models) + len(cloud_models)
    print()
    print("=" * 70)
    print("  DDL HYBRID AUTO-COUNCIL v3.0")
    print("=" * 70)
    print(f"  Prompt: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
    if local_models:
        print(f"  Local: {len(local_models)} ({', '.join(m['name'] for m in local_models)})")
    if cloud_models:
        print(f"  Cloud: {len(cloud_models)} ({', '.join(m['name'] for m in cloud_models)})")
    print(f"  Total: {total} models")
    print(f"  Synth: {synthesizer}")
    print(f"  RAG: {'Active' if use_rag else 'Off'}")
    if save_path:
        print(f"  Save: {save_path}")
    if ingest:
        print(f"  Ingest: Auto-ingest to corpus after save")
    print("=" * 70)
    print()
display_response function · python · L578-L592 (15 LOC)
dex-council.py
def display_response(index, name, provider, result, verbose=False):
    tier = "LOCAL" if provider == "local" else "CLOUD"
    elapsed = f"{result['elapsed']}s" if result['elapsed'] else "—"
    if result["error"]:
        print(f"  [{index+1}] [{tier}] {name} — FAILED ({result['error'][:60]})")
    elif verbose:
        print(f"  [{index+1}] [{tier}] {name} ({elapsed})")
        print(f"{'─'*60}")
        print(result["response"])
        print(f"{'─'*60}")
    else:
        preview = result["response"][:150].replace("\n", " ")
        print(f"  [{index+1}] [{tier}] {name} ({elapsed})")
        print(f"      {preview}...")
    print()
display_synthesis function · python · L594-L605 (12 LOC)
dex-council.py
def display_synthesis(result, verbose=False):
    print()
    print("=" * 70)
    print("  SYNTHESIS (by Dexcell, Seat 1010)")
    print("=" * 70)
    print()
    if result.get("error"):
        print(f"  ERROR: {result['error']}")
    else:
        print(result["response"])
    print()
    print("=" * 70)
log_council function · python · L610-L627 (18 LOC)
dex-council.py
def log_council(prompt, responses, synthesis, synthesizer, use_rag):
    entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "version": "3.0",
        "prompt": prompt[:500],
        "synthesizer": synthesizer,
        "rag_active": use_rag,
        "model_count": len(responses),
        "local_count": sum(1 for r in responses if r["provider"] == "local"),
        "cloud_count": sum(1 for r in responses if r["provider"] != "local"),
        "successful": sum(1 for r in responses if r["result"]["response"]),
        "failed": sum(1 for r in responses if r["result"]["error"]),
    }
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
    except:
        pass
main function · python · L632-L754 (123 LOC)
dex-council.py
def main():
    parser = argparse.ArgumentParser(description="DDL Hybrid AutoCouncil v3.0")
    parser.add_argument("prompt", nargs="?", default=None, help="Prompt to send")
    parser.add_argument("--local-only", action="store_true", help="Local only")
    parser.add_argument("--cloud-only", action="store_true", help="Cloud only")
    parser.add_argument("--all", action="store_true", help="Local + Cloud")
    parser.add_argument("--synthesizer", default=DEFAULT_SYNTHESIZER)
    parser.add_argument("--rag", action="store_true", help="Enable RAG")
    parser.add_argument("--raw", action="store_true", help="Archive RAG")
    parser.add_argument("--from-file", default=None, help="Prompt from file")
    parser.add_argument("--top", type=int, default=TOP_K)
    parser.add_argument("--no-governance", action="store_true")
    parser.add_argument("--save", default=None, help="Save to folder")
    parser.add_argument("--ingest", action="store_true", help="Auto-ingest to corpus")
    parser.add_
load_env function · python · L109-L118 (10 LOC)
dex-deliberate.py
def load_env():
    keys = {}
    if os.path.exists(ENV_FILE):
        with open(ENV_FILE, "r") as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#") and "=" in line:
                    k, v = line.split("=", 1)
                    keys[k.strip()] = v.strip()
    return keys
Repobility · code-quality intelligence · https://repobility.com
retrieve_context function · python · L123-L143 (21 LOC)
dex-deliberate.py
def retrieve_context(query, top_k=TOP_K, use_raw=False):
    try:
        import chromadb
        client = chromadb.PersistentClient(path=CHROMA_DIR)
        collection = client.get_collection(RAW_COLLECTION if use_raw else CANON_COLLECTION)
        r = requests.post(OLLAMA_EMBED_URL, json={"model": EMBED_MODEL, "prompt": query}, timeout=60)
        r.raise_for_status()
        embedding = r.json().get("embedding")
        if not embedding:
            return ""
        results = collection.query(query_embeddings=[embedding], n_results=top_k, include=["documents", "metadatas"])
        chunks = []
        if results and results["documents"]:
            for i, doc in enumerate(results["documents"][0]):
                meta = results["metadatas"][0][i] if results["metadatas"] else {}
                source = meta.get("source_file", "unknown")
                chunks.append(f"[Source: {source}]\n{doc[:400]}")
        return "\n\n".join(chunks)
    except Exception as e:
        print(f"  
query_local function · python · L148-L155 (8 LOC)
dex-deliberate.py
def query_local(model_id, prompt, timeout=180):
    try:
        start = time.time()
        r = requests.post(OLLAMA_URL, json={"model": model_id, "prompt": prompt, "stream": False, "options": {"temperature": 0.4, "num_ctx": 16384}}, timeout=timeout)
        r.raise_for_status()
        return {"response": r.json().get("response", ""), "elapsed": round(time.time() - start, 1), "error": None}
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_gemini function · python · L157-L166 (10 LOC)
dex-deliberate.py
def query_gemini(prompt, api_key, model_id="gemini-1.5-flash"):
    try:
        start = time.time()
        url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={api_key}"
        r = requests.post(url, json={"contents": [{"parts": [{"text": prompt}]}]}, headers={"Content-Type": "application/json"}, timeout=120)
        r.raise_for_status()
        text = r.json().get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
        return {"response": text, "elapsed": round(time.time() - start, 1), "error": None}
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_openai_compat function · python · L168-L176 (9 LOC)
dex-deliberate.py
def query_openai_compat(prompt, api_key, url, model_id, timeout=120):
    try:
        start = time.time()
        r = requests.post(url, json={"model": model_id, "messages": [{"role": "user", "content": prompt}], "temperature": 0.4, "max_tokens": 4096}, headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}, timeout=timeout)
        r.raise_for_status()
        text = r.json().get("choices", [{}])[0].get("message", {}).get("content", "")
        return {"response": text, "elapsed": round(time.time() - start, 1), "error": None}
    except Exception as e:
        return {"response": None, "elapsed": 0, "error": str(e)}
query_model function · python · L178-L186 (9 LOC)
dex-deliberate.py
def query_model(model, prompt, api_keys):
    if model["provider"] == "local":
        return query_local(model["id"], prompt)
    key = api_keys.get(model.get("env_key", ""), "")
    if not key:
        return {"response": None, "elapsed": 0, "error": f"Missing key: {model.get('env_key')}"}
    if model["provider"] == "gemini":
        return query_gemini(prompt, key, model["id"])
    return query_openai_compat(prompt, key, model["url"], model["id"])
generate_followup function · python · L191-L215 (25 LOC)
dex-deliberate.py
def generate_followup(topic, round_num, all_responses, synthesizer=DEFAULT_SYNTHESIZER):
    response_block = ""
    for r in all_responses:
        if r["result"]["response"]:
            response_block += f"\n--- {r['name']} ---\n{r['result']['response'][:800]}\n"

    prompt = f"""You are the deliberation moderator for a DDL AutoCouncil session.

TOPIC: {topic}

The following responses were collected in Round {round_num}:
{response_block}

Your job:
1. Identify the 2-3 most important UNRESOLVED disagreements or open questions from this round.
2. For each, write a sharp follow-up question that forces the models to go deeper.
3. If any model made a claim without evidence, call it out and ask for specifics.
4. If the models are converging too quickly, introduce a devil's advocate angle.

Output ONLY the follow-up prompt that will be sent to all models for Round {round_num + 1}.
Do not include preamble or explanation. Just the follow-up prompt.
Keep it under 300 words."""

    result = 
final_synthesis function · python · L220-L248 (29 LOC)
dex-deliberate.py
def final_synthesis(topic, all_rounds, synthesizer=DEFAULT_SYNTHESIZER):
    rounds_block = ""
    for round_num, round_data in enumerate(all_rounds):
        rounds_block += f"\n{'='*60}\nROUND {round_num + 1}\n{'='*60}\n"
        rounds_block += f"PROMPT: {round_data['prompt'][:200]}\n\n"
        for r in round_data["responses"]:
            if r["result"]["response"]:
                rounds_block += f"--- {r['name']} ({r['provider']}) ---\n{r['result']['response'][:600]}\n\n"

    prompt = f"""You are producing the FINAL SYNTHESIS for a multi-round DDL AutoCouncil deliberation.

TOPIC: {topic}

DELIBERATION RECORD:
{rounds_block}

SYNTHESIS INSTRUCTIONS:
1. ARC: How did the discussion evolve across rounds? What shifted?
2. CONVERGENCE: What did the models ultimately agree on?
3. PERSISTENT DIVERGENCE: What remained unresolved even after multiple rounds?
4. KEY INSIGHT: What was the single most valuable insight across all rounds? Name the model and round.
5. RISKS IDENTIFIED: What ri
save_deliberation function · python · L253-L307 (55 LOC)
dex-deliberate.py
def save_deliberation(folder, topic, all_rounds, final_synth, rag_context=""):
    os.makedirs(folder, exist_ok=True)

    # Topic
    with open(os.path.join(folder, "00_topic.txt"), "w", encoding="utf-8") as f:
        f.write(topic)

    if rag_context:
        with open(os.path.join(folder, "00_rag_context.txt"), "w", encoding="utf-8") as f:
            f.write(rag_context)

    # Each round
    for round_num, round_data in enumerate(all_rounds):
        round_dir = os.path.join(folder, f"round_{round_num + 1:02d}")
        os.makedirs(round_dir, exist_ok=True)

        with open(os.path.join(round_dir, "prompt.txt"), "w", encoding="utf-8") as f:
            f.write(round_data["prompt"])

        if round_data.get("followup"):
            with open(os.path.join(round_dir, "followup.txt"), "w", encoding="utf-8") as f:
                f.write(round_data["followup"])

        for i, r in enumerate(round_data["responses"]):
            tier = "LOCAL" if r["provider"] == "local" else "CL
Citation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L312-L480 (169 LOC)
dex-deliberate.py
def main():
    parser = argparse.ArgumentParser(description="DDL AutoCouncil Deliberation Engine v1.0")
    parser.add_argument("topic", nargs="?", default=None, help="Deliberation topic")
    parser.add_argument("--rounds", type=int, default=3, help="Number of deliberation rounds")
    parser.add_argument("--local-only", action="store_true", help="Local models only")
    parser.add_argument("--cloud-only", action="store_true", help="Cloud models only")
    parser.add_argument("--all", action="store_true", help="Both local + cloud")
    parser.add_argument("--rag", action="store_true", help="Enable RAG")
    parser.add_argument("--raw", action="store_true", help="Use archive for RAG")
    parser.add_argument("--from-file", default=None, help="Read topic from file")
    parser.add_argument("--top", type=int, default=TOP_K, help="RAG chunks")
    parser.add_argument("--save", default=None, help="Save deliberation to folder")
    parser.add_argument("--synthesizer", default=DEFAULT_SYNTH
HTMLStripper class · python · L82-L119 (38 LOC)
dex-fetch.py
class HTMLStripper(HTMLParser):
    def __init__(self):
        super().__init__()
        self.result = []
        self.skip_tags = {'script', 'style', 'head', 'meta', 'link', 'noscript'}
        self.current_skip = False
        self.skip_depth = 0

    def handle_starttag(self, tag, attrs):
        if tag in self.skip_tags:
            self.current_skip = True
            self.skip_depth += 1
        elif tag in ('br', 'hr'):
            self.result.append('\n')
        elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
            self.result.append('\n')

    def handle_endtag(self, tag):
        if tag in self.skip_tags:
            self.skip_depth -= 1
            if self.skip_depth <= 0:
                self.current_skip = False
                self.skip_depth = 0
        elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
            self.result.append('\n')

    def handle_data(self, data
__init__ method · python · L83-L88 (6 LOC)
dex-fetch.py
    def __init__(self):
        super().__init__()
        self.result = []
        self.skip_tags = {'script', 'style', 'head', 'meta', 'link', 'noscript'}
        self.current_skip = False
        self.skip_depth = 0
handle_starttag method · python · L90-L97 (8 LOC)
dex-fetch.py
    def handle_starttag(self, tag, attrs):
        if tag in self.skip_tags:
            self.current_skip = True
            self.skip_depth += 1
        elif tag in ('br', 'hr'):
            self.result.append('\n')
        elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
            self.result.append('\n')
handle_endtag method · python · L99-L106 (8 LOC)
dex-fetch.py
    def handle_endtag(self, tag):
        if tag in self.skip_tags:
            self.skip_depth -= 1
            if self.skip_depth <= 0:
                self.current_skip = False
                self.skip_depth = 0
        elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
            self.result.append('\n')
handle_data method · python · L108-L112 (5 LOC)
dex-fetch.py
    def handle_data(self, data):
        if not self.current_skip:
            text = data.strip()
            if text:
                self.result.append(text)
get_text method · python · L114-L119 (6 LOC)
dex-fetch.py
    def get_text(self):
        raw = ' '.join(self.result)
        # Clean up whitespace
        raw = re.sub(r'\n\s*\n', '\n\n', raw)
        raw = re.sub(r'  +', ' ', raw)
        return raw.strip()
strip_html function · python · L121-L124 (4 LOC)
dex-fetch.py
def strip_html(html_content):
    stripper = HTMLStripper()
    stripper.feed(html_content)
    return stripper.get_text()
Source: Repobility analyzer · https://repobility.com
fetch_page function · python · L129-L144 (16 LOC)
dex-fetch.py
def fetch_page(url, timeout=30):
    try:
        headers = {
            'User-Agent': 'DexJr-Fetch/1.0 (DDL Local AI; +https://dropdownlogistics.com)',
        }
        r = requests.get(url, headers=headers, timeout=timeout)
        r.raise_for_status()
        return {
            "html": r.text,
            "status": r.status_code,
            "content_type": r.headers.get('content-type', ''),
            "size": len(r.text),
            "error": None,
        }
    except Exception as e:
        return {"html": None, "status": 0, "content_type": "", "size": 0, "error": str(e)}
retrieve_context function · python · L149-L177 (29 LOC)
dex-fetch.py
def retrieve_context(query, top_k=TOP_K):
    try:
        import chromadb
        client = chromadb.PersistentClient(path=CHROMA_DIR)
        collection = client.get_collection(CANON_COLLECTION)
        r = requests.post(
            OLLAMA_EMBED_URL,
            json={"model": EMBED_MODEL, "prompt": query},
            timeout=60,
        )
        r.raise_for_status()
        embedding = r.json().get("embedding")
        if not embedding:
            return ""
        results = collection.query(
            query_embeddings=[embedding],
            n_results=top_k,
            include=["documents", "metadatas"],
        )
        chunks = []
        if results and results["documents"]:
            for i, doc in enumerate(results["documents"][0]):
                meta = results["metadatas"][0][i] if results["metadatas"] else {}
                source = meta.get("source_file", "unknown")
                chunks.append(f"[Source: {source}]\n{doc[:400]}")
        return "\n\n".join(chunk
ask_dexjr function · python · L182-L214 (33 LOC)
dex-fetch.py
def ask_dexjr(page_text, question, rag_context="", model=DEFAULT_MODEL):
    prompt_parts = [PAGE_GOVERNANCE]

    if rag_context:
        prompt_parts.append(f"\nRAG CONTEXT (from DDL corpus):\n{rag_context}")

    prompt_parts.append(f"\nPAGE CONTENT:\n{page_text[:12000]}")
    prompt_parts.append(f"\nQUESTION:\n{question}")
    prompt_parts.append("\nAnalyze the page content and answer the question. Be specific. Cite exact elements from the page.")

    prompt = "\n".join(prompt_parts)

    try:
        start = time.time()
        r = requests.post(
            OLLAMA_URL,
            json={
                "model": model,
                "prompt": prompt,
                "stream": False,
                "options": {"temperature": 0.3, "num_ctx": 16384},
            },
            timeout=180,
        )
        r.raise_for_status()
        elapsed = time.time() - start
        return {
            "response": r.json().get("response", "[No response]"),
            "elapsed": round(el
crawl_sitemap function · python · L219-L270 (52 LOC)
dex-fetch.py
def crawl_sitemap(base_url, save_dir):
    """Fetch sitemap.xml and crawl all pages."""
    os.makedirs(save_dir, exist_ok=True)

    sitemap_url = f"{base_url.rstrip('/')}/sitemap.xml"
    print(f"  Fetching sitemap: {sitemap_url}")

    result = fetch_page(sitemap_url)
    if result["error"]:
        # Try sitemap-0.xml (common for Next.js)
        sitemap_url = f"{base_url.rstrip('/')}/sitemap-0.xml"
        print(f"  Trying: {sitemap_url}")
        result = fetch_page(sitemap_url)

    if result["error"]:
        print(f"  ERROR: Could not fetch sitemap: {result['error']}")
        print(f"  Falling back to known routes...")
        return []

    # Extract URLs from sitemap XML
    urls = re.findall(r'<loc>(.*?)</loc>', result["html"])
    print(f"  Found {len(urls)} URLs in sitemap")

    fetched = []
    for i, url in enumerate(urls):
        print(f"  [{i+1}/{len(urls)}] {url}...", end=" ", flush=True)
        page = fetch_page(url)
        if page["error"]:
            print(f
save_text function · python · L275-L282 (8 LOC)
dex-fetch.py
def save_text(text, url, filepath):
    os.makedirs(os.path.dirname(filepath), exist_ok=True)
    with open(filepath, "w", encoding="utf-8") as f:
        f.write(f"URL: {url}\n")
        f.write(f"FETCHED: {datetime.datetime.now().isoformat()}\n")
        f.write(f"SIZE: {len(text)} chars\n\n")
        f.write(text)
    print(f"  Saved to: {filepath}")
ingest_text function · python · L284-L296 (13 LOC)
dex-fetch.py
def ingest_text(text, url):
    """Save to canon folder for next ingestion sweep."""
    os.makedirs(CANON_DIR, exist_ok=True)
    slug = url.replace("https://", "").replace("http://", "").replace("/", "_").rstrip("_")
    ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = f"WebFetch_{slug}_{ts}.txt"
    filepath = os.path.join(CANON_DIR, filename)
    with open(filepath, "w", encoding="utf-8") as f:
        f.write(f"URL: {url}\n")
        f.write(f"FETCHED: {datetime.datetime.now().isoformat()}\n")
        f.write(f"SIZE: {len(text)} chars\n\n")
        f.write(text)
    print(f"  Ingested to corpus: {filepath}")
log_fetch function · python · L301-L314 (14 LOC)
dex-fetch.py
def log_fetch(url, text_length, question, response_length, elapsed):
    entry = {
        "timestamp": datetime.datetime.now().isoformat(),
        "url": url,
        "text_chars": text_length,
        "question": question[:200] if question else None,
        "response_chars": response_length,
        "elapsed": elapsed,
    }
    try:
        with open(LOG_FILE, "a", encoding="utf-8") as f:
            f.write(json.dumps(entry, ensure_ascii=False) + "\n")
    except:
        pass
display_result function · python · L319-L342 (24 LOC)
dex-fetch.py
def display_result(url, text, question, result, rag_used):
    print()
    print("=" * 70)
    print(f"  DEX JR WEB FETCH — Page Analysis")
    print("=" * 70)
    print(f"  URL: {url}")
    print(f"  Page text: {len(text)} chars")
    print(f"  RAG: {'Active' if rag_used else 'Off'}")
    if question:
        print(f"  Question: {question[:80]}")
    print("=" * 70)
    print()

    if result and result.get("response"):
        print(result["response"])
    elif result and result.get("error"):
        print(f"  ERROR: {result['error']}")

    print()
    print("=" * 70)
    if result:
        print(f"  Elapsed: {result.get('elapsed', 0)}s")
    print(f"  Log: {LOG_FILE}")
    print("=" * 70)
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
main function · python · L347-L425 (79 LOC)
dex-fetch.py
def main():
    parser = argparse.ArgumentParser(description="Dex Jr Web Fetch v1.0")
    parser.add_argument("url", nargs="?", default=None, help="URL to fetch")
    parser.add_argument("--ask", default=None, help="Question to ask about the page")
    parser.add_argument("--rag", action="store_true", help="Also retrieve RAG context")
    parser.add_argument("--raw", action="store_true", help="Just show stripped text")
    parser.add_argument("--save", default=None, help="Save stripped text to file")
    parser.add_argument("--ingest", action="store_true", help="Save to corpus for ingestion")
    parser.add_argument("--model", default=DEFAULT_MODEL, help="LLM model to use")
    parser.add_argument("--sitemap", default=None, help="Crawl sitemap from base URL")
    parser.add_argument("--top", type=int, default=TOP_K, help="RAG chunks to retrieve")

    args = parser.parse_args()

    # Sitemap crawl mode
    if args.sitemap:
        save_dir = args.save or os.path.join(SCRIPT_DIR, "fetc
classify_tier function · python · L116-L124 (9 LOC)
dex-ingest.py
def classify_tier(rel_path: str, filename: str, folder: str) -> Tuple[str, str]:
    s = f"{rel_path} {filename} {folder}".lower()
    if any(m in s for m in CANON_PATH_MARKERS):
        return ("canon", "ratified")
    if any(m in s for m in FOUNDATION_PATH_MARKERS):
        return ("foundation", "conceptual")
    if any(m in s for m in ARCHIVE_PATH_MARKERS):
        return ("archive", "historical")
    return ("unknown", "unknown")
infer_source_type function · python · L127-L154 (28 LOC)
dex-ingest.py
def infer_source_type(filename: str, extension: str) -> str:
    """
    Infer STD-DDL-METADATA-001 source_type enum value from filename + extension.

    Extension-first for code/data/web. Filename-prefix override for text
    files that are classifiable by naming convention (council reviews,
    governance docs, synthesis, system telemetry). Fallback: "unknown".
    """
    ext = (extension or "").lower()
    if ext in CODE_EXTENSIONS:
        return "code"
    if ext == ".csv":
        return "spreadsheet"
    if ext in (".html", ".mhtml"):
        return "web_archive"
    # Text-file filename-prefix rules (.md / .txt)
    name = filename or ""
    if name.startswith("DDLCouncilReview_"):
        return "council_review"
    if name.startswith("SYNTH-") or "_SYNTH." in name:
        return "council_synthesis"
    if any(name.startswith(p) for p in ("ADR-", "STD-", "PRO-", "CR-")):
        return "governance"
    if name.startswith("sweep_") and name.endswith(".md"):
        return "s
‹ prevpage 2 / 4next ›