Function bodies 161 total
convert_vcf function · python · L273-L313 (41 LOC)dex-convert.py
def convert_vcf(file_path: Path, out_dir: Path) -> list[Path]:
"""Convert VCF contacts to readable text."""
converted_date = datetime.now().strftime("%Y-%m-%d")
header = source_header(str(file_path), "vcf-contacts", converted_date)
try:
with open(file_path, encoding="utf-8", errors="replace") as f:
raw = f.read()
except Exception as e:
print(f" [WARN] VCF read error: {e}")
return []
contacts = []
current = []
for line in raw.splitlines():
if line.startswith("BEGIN:VCARD"):
current = []
elif line.startswith("END:VCARD"):
contacts.append(current)
current = []
else:
current.append(line)
lines = [f"GOOGLE CONTACTS — {len(contacts)} contacts\n"]
for i, contact in enumerate(contacts):
contact_lines = [f"--- Contact {i+1} ---"]
for field in contact:
if ":" in field:
key, _, val = field.partition(":")convert_mbox function · python · L317-L396 (80 LOC)dex-convert.py
def convert_mbox(file_path: Path, out_dir: Path, max_emails: int = 0) -> list[Path]:
"""Convert MBOX to individual email text files. Groups into chunks."""
converted_date = datetime.now().strftime("%Y-%m-%d")
ensure_dir(out_dir)
print(f"\n Processing MBOX: {file_path.name}")
print(f" This may take a while for large files...")
try:
mbox = mailbox.mbox(str(file_path))
except Exception as e:
print(f" [FAIL] Could not open MBOX: {e}")
return []
output_files = []
batch = []
batch_num = 1
batch_size = 500 # emails per output file
count = 0
for i, message in enumerate(mbox):
if max_emails and i >= max_emails:
break
try:
date = str(message.get("Date", ""))
subject = str(message.get("Subject", "(no subject)"))
sender = str(message.get("From", ""))
to = str(message.get("To", ""))
# Get text body
convert_facebook_messages function · python · L400-L445 (46 LOC)dex-convert.py
def convert_facebook_messages(fb_dir: Path, out_dir: Path) -> list[Path]:
"""Convert Facebook message JSON exports to text."""
converted_date = datetime.now().strftime("%Y-%m-%d")
output_files = []
msg_dir = fb_dir / "messages"
if not msg_dir.exists():
msg_dir = fb_dir # try the dir itself
json_files = list(msg_dir.rglob("message_*.json"))
if not json_files:
json_files = list(msg_dir.rglob("*.json"))
print(f" Found {len(json_files)} Facebook message JSON files")
for jf in json_files:
try:
with open(jf, encoding="utf-8", errors="replace") as f:
data = json.load(f)
except Exception:
continue
participants = data.get("participants", [])
participant_names = [p.get("name", "?") for p in participants]
messages = data.get("messages", [])
lines = [
source_header(str(jf), "facebook-messages", converted_date),
f"CONVERSATION:chunk_file function · python · L449-L466 (18 LOC)dex-convert.py
def chunk_file(content: str, stem: str, out_dir: Path,
chunk_size: int, file_type: str) -> list[Path]:
"""Split large content into chunk files."""
chunks = []
total_chunks = (len(content) // chunk_size) + 1
print(f" Chunking {stem} → {total_chunks} files ({chunk_size/1000:.0f}K chars each)")
for i in range(total_chunks):
start = i * chunk_size
end = min(start + chunk_size, len(content))
chunk = content[start:end]
out_path = out_dir / f"{stem}_chunk_{i+1:03d}of{total_chunks:03d}.txt"
write_output(chunk, out_path, f"{stem} chunk {i+1}/{total_chunks}")
chunks.append(out_path)
if end >= len(content):
break
return chunkscopy_to_canon function · python · L470-L485 (16 LOC)dex-convert.py
def copy_to_canon(files: list[Path], canon_dir: str = CANON_FOLDER):
"""Copy converted files to canon folder for next sweep/ingest."""
canon_path = Path(canon_dir)
if not canon_path.exists():
print(f" [WARN] Canon folder not found: {canon_dir}")
return
print(f"\n Copying {len(files)} files to canon...")
for f in files:
dest = canon_path / f.name
try:
import shutil
shutil.copy2(f, dest)
print(f" [OK] {f.name}")
except Exception as e:
print(f" [WARN] Copy failed for {f.name}: {e}")main function · python · L489-L603 (115 LOC)dex-convert.py
def main():
parser = argparse.ArgumentParser(description="dex-convert.py — Format converter for Dex Jr. corpus")
parser.add_argument("--file", help="Single file to convert")
parser.add_argument("--dir", help="Directory to convert")
parser.add_argument("--ext", help="File extension filter for --dir (e.g. html, csv)")
parser.add_argument("--type", help="Force conversion type: html, csv, reddit-csv, json, vcf, facebook, mbox")
parser.add_argument("--all-csv", action="store_true", help="Convert all CSVs in dir as Reddit exports")
parser.add_argument("--mbox", help="MBOX file path (Gmail)")
parser.add_argument("--chunk", type=int, default=0,
help=f"Chunk size in chars (0=no chunking, default threshold={DEFAULT_CHUNK_SIZE:,})")
parser.add_argument("--out-dir", default=DEFAULT_OUT_DIR, help="Output directory")
parser.add_argument("--to-canon", action="store_true", help="Copy results to canon foload_env function · python · L108-L117 (10 LOC)dex-council.py
def load_env():
keys = {}
if os.path.exists(ENV_FILE):
with open(ENV_FILE, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
keys[k.strip()] = v.strip()
return keysSource: Repobility analyzer · https://repobility.com
retrieve_context function · python · L185-L214 (30 LOC)dex-council.py
def retrieve_context(query, top_k=TOP_K, use_raw=False):
try:
import chromadb
client = chromadb.PersistentClient(path=CHROMA_DIR)
col_name = RAW_COLLECTION if use_raw else CANON_COLLECTION
collection = client.get_collection(col_name)
r = requests.post(
OLLAMA_EMBED_URL,
json={"model": EMBED_MODEL, "prompt": query},
timeout=60,
)
r.raise_for_status()
embedding = r.json().get("embedding")
if not embedding:
return ""
results = collection.query(
query_embeddings=[embedding],
n_results=top_k,
include=["documents", "metadatas"],
)
chunks = []
if results and results["documents"]:
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i] if results["metadatas"] else {}
source = meta.get("source_file", "unknown")
chunks.aquery_local function · python · L219-L240 (22 LOC)dex-council.py
def query_local(model_id, prompt, timeout=DEFAULT_TIMEOUT):
try:
start = time.time()
r = requests.post(
OLLAMA_URL,
json={
"model": model_id,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_ctx": 16384},
},
timeout=timeout,
)
r.raise_for_status()
elapsed = time.time() - start
return {
"response": r.json().get("response", "[No response]"),
"elapsed": round(elapsed, 1),
"error": None,
}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_gemini function · python · L242-L258 (17 LOC)dex-council.py
def query_gemini(prompt, api_key, model_id="gemini-2.5-flash", timeout=120):
try:
start = time.time()
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={api_key}"
r = requests.post(
url,
json={"contents": [{"parts": [{"text": prompt}]}]},
headers={"Content-Type": "application/json"},
timeout=timeout,
)
r.raise_for_status()
data = r.json()
text = data.get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "[No response]")
elapsed = time.time() - start
return {"response": text, "elapsed": round(elapsed, 1), "error": None}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_openai_compatible function · python · L260-L283 (24 LOC)dex-council.py
def query_openai_compatible(prompt, api_key, url, model_id, timeout=120):
try:
start = time.time()
r = requests.post(
url,
json={
"model": model_id,
"messages": [{"role": "user", "content": prompt}],
"temperature": 0.3,
"max_tokens": 4096,
},
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
},
timeout=timeout,
)
r.raise_for_status()
data = r.json()
text = data.get("choices", [{}])[0].get("message", {}).get("content", "[No response]")
elapsed = time.time() - start
return {"response": text, "elapsed": round(elapsed, 1), "error": None}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_cloud function · python · L285-L292 (8 LOC)dex-council.py
def query_cloud(model, prompt, api_keys, timeout=120):
key = api_keys.get(model["env_key"], "")
if not key:
return {"response": None, "elapsed": 0, "error": f"Missing API key: {model['env_key']}"}
if model["provider"] == "gemini":
return query_gemini(prompt, key, model["id"], timeout)
else:
return query_openai_compatible(prompt, key, model["url"], model["id"], timeout)build_governed_prompt function · python · L297-L302 (6 LOC)dex-council.py
def build_governed_prompt(prompt, rag_context=""):
parts = [GOVERNANCE]
if rag_context:
parts.append(f"\nRETRIEVED CONTEXT (from DDL knowledge base):\n{rag_context}")
parts.append(f"\nPROMPT:\n{prompt}")
return "\n".join(parts)synthesize function · python · L307-L339 (33 LOC)dex-council.py
def synthesize(prompt, responses, synthesizer=DEFAULT_SYNTHESIZER):
response_block = ""
for i, r in enumerate(responses):
if r["result"]["response"]:
response_block += f"\n{'='*60}\nMODEL {i+1}: {r['name']} [{r['provider'].upper()}]\n{'='*60}\n{r['result']['response']}\n"
else:
response_block += f"\n{'='*60}\nMODEL {i+1}: {r['name']} [{r['provider'].upper()}]\n{'='*60}\n[FAILED: {r['result']['error']}]\n"
synthesis_prompt = f"""You are the synthesis engine for a DDL Hybrid AutoCouncil review.
The following prompt was sent independently to {len(responses)} AI models across local and cloud tiers.
Each model received identical governance context from the DDL architecture.
Each model responded without seeing the others' responses.
ORIGINAL PROMPT:
{prompt}
MODEL RESPONSES:
{response_block}
SYNTHESIS INSTRUCTIONS:
1. CONVERGENCE: What did all or most models agree on?
2. DIVERGENCE: Where did models disagree? Note which model said whatsave_to_folder function · python · L344-L431 (88 LOC)dex-council.py
def save_to_folder(folder, prompt, responses, synthesis, rag_context=""):
os.makedirs(folder, exist_ok=True)
# Prompt
with open(os.path.join(folder, "00_prompt.txt"), "w", encoding="utf-8") as f:
f.write(prompt)
# RAG context
if rag_context:
with open(os.path.join(folder, "00_rag_context.txt"), "w", encoding="utf-8") as f:
f.write(rag_context)
# Individual responses
for i, r in enumerate(responses):
tier = "LOCAL" if r["provider"] == "local" else "CLOUD"
safe_name = r["name"].replace(" ", "_").replace("/", "-").replace("(", "").replace(")", "")
filename = f"{i+1:02d}_{tier}_{safe_name}.txt"
with open(os.path.join(folder, filename), "w", encoding="utf-8") as f:
f.write(f"MODEL: {r['name']}\n")
f.write(f"PROVIDER: {r['provider']}\n")
f.write(f"MODEL_ID: {r['model_id']}\n")
f.write(f"ELAPSED: {r['result']['elapsed']}s\n")
f.write(f"TIMESTAMRepobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
auto_ingest function · python · L436-L475 (40 LOC)dex-council.py
def auto_ingest(folder):
"""Copy the full transcript to the canon folder and trigger ingestion."""
transcript = os.path.join(folder, "99_full_transcript.txt")
if not os.path.exists(transcript):
print(" [WARN] No transcript to ingest.")
return
canon_dir = r"C:\\Users\\dexjr\\99_DexUniverseArchive\\00_Archive\\AutoCouncil-Live"
os.makedirs(canon_dir, exist_ok=True)
# Generate unique filename
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
basename = os.path.basename(folder).replace(" ", "_")
dest = os.path.join(canon_dir, f"AutoCouncil_{basename}_{ts}.txt")
try:
import shutil
shutil.copy2(transcript, dest)
print(f" Copied transcript to corpus: {dest}")
# Run ingestion
ingest_script = os.path.join(SCRIPT_DIR, "dex-ingest.py")
if os.path.exists(ingest_script):
print(" Running corpus ingestion...")
result = subprocess.run(
["python", iresynthesize function · python · L480-L552 (73 LOC)dex-council.py
def resynthesize(folder, synthesizer=DEFAULT_SYNTHESIZER):
prompt_path = os.path.join(folder, "00_prompt.txt")
if not os.path.exists(prompt_path):
print(f" ERROR: No prompt file found in {folder}")
return
with open(prompt_path, "r", encoding="utf-8") as f:
prompt = f.read().strip()
# Read response files
responses_text = []
for filename in sorted(os.listdir(folder)):
if filename.endswith(".txt") and not filename.startswith("00") and not filename.startswith("99"):
filepath = os.path.join(folder, filename)
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
lines = content.split("\n")
name = lines[0].replace("MODEL: ", "") if lines else "Unknown"
provider = lines[1].replace("PROVIDER: ", "") if len(lines) > 1 else "unknown"
body = ""
if "RESPONSE" in content:
parts = content.split("RESPONSE\n" + "=" *display_header function · python · L557-L576 (20 LOC)dex-council.py
def display_header(prompt, local_models, cloud_models, synthesizer, use_rag, save_path, ingest):
total = len(local_models) + len(cloud_models)
print()
print("=" * 70)
print(" DDL HYBRID AUTO-COUNCIL v3.0")
print("=" * 70)
print(f" Prompt: {prompt[:80]}{'...' if len(prompt) > 80 else ''}")
if local_models:
print(f" Local: {len(local_models)} ({', '.join(m['name'] for m in local_models)})")
if cloud_models:
print(f" Cloud: {len(cloud_models)} ({', '.join(m['name'] for m in cloud_models)})")
print(f" Total: {total} models")
print(f" Synth: {synthesizer}")
print(f" RAG: {'Active' if use_rag else 'Off'}")
if save_path:
print(f" Save: {save_path}")
if ingest:
print(f" Ingest: Auto-ingest to corpus after save")
print("=" * 70)
print()display_response function · python · L578-L592 (15 LOC)dex-council.py
def display_response(index, name, provider, result, verbose=False):
tier = "LOCAL" if provider == "local" else "CLOUD"
elapsed = f"{result['elapsed']}s" if result['elapsed'] else "—"
if result["error"]:
print(f" [{index+1}] [{tier}] {name} — FAILED ({result['error'][:60]})")
elif verbose:
print(f" [{index+1}] [{tier}] {name} ({elapsed})")
print(f"{'─'*60}")
print(result["response"])
print(f"{'─'*60}")
else:
preview = result["response"][:150].replace("\n", " ")
print(f" [{index+1}] [{tier}] {name} ({elapsed})")
print(f" {preview}...")
print()display_synthesis function · python · L594-L605 (12 LOC)dex-council.py
def display_synthesis(result, verbose=False):
print()
print("=" * 70)
print(" SYNTHESIS (by Dexcell, Seat 1010)")
print("=" * 70)
print()
if result.get("error"):
print(f" ERROR: {result['error']}")
else:
print(result["response"])
print()
print("=" * 70)log_council function · python · L610-L627 (18 LOC)dex-council.py
def log_council(prompt, responses, synthesis, synthesizer, use_rag):
entry = {
"timestamp": datetime.datetime.now().isoformat(),
"version": "3.0",
"prompt": prompt[:500],
"synthesizer": synthesizer,
"rag_active": use_rag,
"model_count": len(responses),
"local_count": sum(1 for r in responses if r["provider"] == "local"),
"cloud_count": sum(1 for r in responses if r["provider"] != "local"),
"successful": sum(1 for r in responses if r["result"]["response"]),
"failed": sum(1 for r in responses if r["result"]["error"]),
}
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except:
passmain function · python · L632-L754 (123 LOC)dex-council.py
def main():
parser = argparse.ArgumentParser(description="DDL Hybrid AutoCouncil v3.0")
parser.add_argument("prompt", nargs="?", default=None, help="Prompt to send")
parser.add_argument("--local-only", action="store_true", help="Local only")
parser.add_argument("--cloud-only", action="store_true", help="Cloud only")
parser.add_argument("--all", action="store_true", help="Local + Cloud")
parser.add_argument("--synthesizer", default=DEFAULT_SYNTHESIZER)
parser.add_argument("--rag", action="store_true", help="Enable RAG")
parser.add_argument("--raw", action="store_true", help="Archive RAG")
parser.add_argument("--from-file", default=None, help="Prompt from file")
parser.add_argument("--top", type=int, default=TOP_K)
parser.add_argument("--no-governance", action="store_true")
parser.add_argument("--save", default=None, help="Save to folder")
parser.add_argument("--ingest", action="store_true", help="Auto-ingest to corpus")
parser.add_load_env function · python · L109-L118 (10 LOC)dex-deliberate.py
def load_env():
keys = {}
if os.path.exists(ENV_FILE):
with open(ENV_FILE, "r") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#") and "=" in line:
k, v = line.split("=", 1)
keys[k.strip()] = v.strip()
return keysRepobility · code-quality intelligence · https://repobility.com
retrieve_context function · python · L123-L143 (21 LOC)dex-deliberate.py
def retrieve_context(query, top_k=TOP_K, use_raw=False):
try:
import chromadb
client = chromadb.PersistentClient(path=CHROMA_DIR)
collection = client.get_collection(RAW_COLLECTION if use_raw else CANON_COLLECTION)
r = requests.post(OLLAMA_EMBED_URL, json={"model": EMBED_MODEL, "prompt": query}, timeout=60)
r.raise_for_status()
embedding = r.json().get("embedding")
if not embedding:
return ""
results = collection.query(query_embeddings=[embedding], n_results=top_k, include=["documents", "metadatas"])
chunks = []
if results and results["documents"]:
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i] if results["metadatas"] else {}
source = meta.get("source_file", "unknown")
chunks.append(f"[Source: {source}]\n{doc[:400]}")
return "\n\n".join(chunks)
except Exception as e:
print(f" query_local function · python · L148-L155 (8 LOC)dex-deliberate.py
def query_local(model_id, prompt, timeout=180):
try:
start = time.time()
r = requests.post(OLLAMA_URL, json={"model": model_id, "prompt": prompt, "stream": False, "options": {"temperature": 0.4, "num_ctx": 16384}}, timeout=timeout)
r.raise_for_status()
return {"response": r.json().get("response", ""), "elapsed": round(time.time() - start, 1), "error": None}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_gemini function · python · L157-L166 (10 LOC)dex-deliberate.py
def query_gemini(prompt, api_key, model_id="gemini-1.5-flash"):
try:
start = time.time()
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model_id}:generateContent?key={api_key}"
r = requests.post(url, json={"contents": [{"parts": [{"text": prompt}]}]}, headers={"Content-Type": "application/json"}, timeout=120)
r.raise_for_status()
text = r.json().get("candidates", [{}])[0].get("content", {}).get("parts", [{}])[0].get("text", "")
return {"response": text, "elapsed": round(time.time() - start, 1), "error": None}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_openai_compat function · python · L168-L176 (9 LOC)dex-deliberate.py
def query_openai_compat(prompt, api_key, url, model_id, timeout=120):
try:
start = time.time()
r = requests.post(url, json={"model": model_id, "messages": [{"role": "user", "content": prompt}], "temperature": 0.4, "max_tokens": 4096}, headers={"Content-Type": "application/json", "Authorization": f"Bearer {api_key}"}, timeout=timeout)
r.raise_for_status()
text = r.json().get("choices", [{}])[0].get("message", {}).get("content", "")
return {"response": text, "elapsed": round(time.time() - start, 1), "error": None}
except Exception as e:
return {"response": None, "elapsed": 0, "error": str(e)}query_model function · python · L178-L186 (9 LOC)dex-deliberate.py
def query_model(model, prompt, api_keys):
if model["provider"] == "local":
return query_local(model["id"], prompt)
key = api_keys.get(model.get("env_key", ""), "")
if not key:
return {"response": None, "elapsed": 0, "error": f"Missing key: {model.get('env_key')}"}
if model["provider"] == "gemini":
return query_gemini(prompt, key, model["id"])
return query_openai_compat(prompt, key, model["url"], model["id"])generate_followup function · python · L191-L215 (25 LOC)dex-deliberate.py
def generate_followup(topic, round_num, all_responses, synthesizer=DEFAULT_SYNTHESIZER):
response_block = ""
for r in all_responses:
if r["result"]["response"]:
response_block += f"\n--- {r['name']} ---\n{r['result']['response'][:800]}\n"
prompt = f"""You are the deliberation moderator for a DDL AutoCouncil session.
TOPIC: {topic}
The following responses were collected in Round {round_num}:
{response_block}
Your job:
1. Identify the 2-3 most important UNRESOLVED disagreements or open questions from this round.
2. For each, write a sharp follow-up question that forces the models to go deeper.
3. If any model made a claim without evidence, call it out and ask for specifics.
4. If the models are converging too quickly, introduce a devil's advocate angle.
Output ONLY the follow-up prompt that will be sent to all models for Round {round_num + 1}.
Do not include preamble or explanation. Just the follow-up prompt.
Keep it under 300 words."""
result = final_synthesis function · python · L220-L248 (29 LOC)dex-deliberate.py
def final_synthesis(topic, all_rounds, synthesizer=DEFAULT_SYNTHESIZER):
rounds_block = ""
for round_num, round_data in enumerate(all_rounds):
rounds_block += f"\n{'='*60}\nROUND {round_num + 1}\n{'='*60}\n"
rounds_block += f"PROMPT: {round_data['prompt'][:200]}\n\n"
for r in round_data["responses"]:
if r["result"]["response"]:
rounds_block += f"--- {r['name']} ({r['provider']}) ---\n{r['result']['response'][:600]}\n\n"
prompt = f"""You are producing the FINAL SYNTHESIS for a multi-round DDL AutoCouncil deliberation.
TOPIC: {topic}
DELIBERATION RECORD:
{rounds_block}
SYNTHESIS INSTRUCTIONS:
1. ARC: How did the discussion evolve across rounds? What shifted?
2. CONVERGENCE: What did the models ultimately agree on?
3. PERSISTENT DIVERGENCE: What remained unresolved even after multiple rounds?
4. KEY INSIGHT: What was the single most valuable insight across all rounds? Name the model and round.
5. RISKS IDENTIFIED: What risave_deliberation function · python · L253-L307 (55 LOC)dex-deliberate.py
def save_deliberation(folder, topic, all_rounds, final_synth, rag_context=""):
os.makedirs(folder, exist_ok=True)
# Topic
with open(os.path.join(folder, "00_topic.txt"), "w", encoding="utf-8") as f:
f.write(topic)
if rag_context:
with open(os.path.join(folder, "00_rag_context.txt"), "w", encoding="utf-8") as f:
f.write(rag_context)
# Each round
for round_num, round_data in enumerate(all_rounds):
round_dir = os.path.join(folder, f"round_{round_num + 1:02d}")
os.makedirs(round_dir, exist_ok=True)
with open(os.path.join(round_dir, "prompt.txt"), "w", encoding="utf-8") as f:
f.write(round_data["prompt"])
if round_data.get("followup"):
with open(os.path.join(round_dir, "followup.txt"), "w", encoding="utf-8") as f:
f.write(round_data["followup"])
for i, r in enumerate(round_data["responses"]):
tier = "LOCAL" if r["provider"] == "local" else "CLCitation: Repobility (2026). State of AI-Generated Code. https://repobility.com/research/
main function · python · L312-L480 (169 LOC)dex-deliberate.py
def main():
parser = argparse.ArgumentParser(description="DDL AutoCouncil Deliberation Engine v1.0")
parser.add_argument("topic", nargs="?", default=None, help="Deliberation topic")
parser.add_argument("--rounds", type=int, default=3, help="Number of deliberation rounds")
parser.add_argument("--local-only", action="store_true", help="Local models only")
parser.add_argument("--cloud-only", action="store_true", help="Cloud models only")
parser.add_argument("--all", action="store_true", help="Both local + cloud")
parser.add_argument("--rag", action="store_true", help="Enable RAG")
parser.add_argument("--raw", action="store_true", help="Use archive for RAG")
parser.add_argument("--from-file", default=None, help="Read topic from file")
parser.add_argument("--top", type=int, default=TOP_K, help="RAG chunks")
parser.add_argument("--save", default=None, help="Save deliberation to folder")
parser.add_argument("--synthesizer", default=DEFAULT_SYNTHHTMLStripper class · python · L82-L119 (38 LOC)dex-fetch.py
class HTMLStripper(HTMLParser):
def __init__(self):
super().__init__()
self.result = []
self.skip_tags = {'script', 'style', 'head', 'meta', 'link', 'noscript'}
self.current_skip = False
self.skip_depth = 0
def handle_starttag(self, tag, attrs):
if tag in self.skip_tags:
self.current_skip = True
self.skip_depth += 1
elif tag in ('br', 'hr'):
self.result.append('\n')
elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
self.result.append('\n')
def handle_endtag(self, tag):
if tag in self.skip_tags:
self.skip_depth -= 1
if self.skip_depth <= 0:
self.current_skip = False
self.skip_depth = 0
elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
self.result.append('\n')
def handle_data(self, data__init__ method · python · L83-L88 (6 LOC)dex-fetch.py
def __init__(self):
super().__init__()
self.result = []
self.skip_tags = {'script', 'style', 'head', 'meta', 'link', 'noscript'}
self.current_skip = False
self.skip_depth = 0handle_starttag method · python · L90-L97 (8 LOC)dex-fetch.py
def handle_starttag(self, tag, attrs):
if tag in self.skip_tags:
self.current_skip = True
self.skip_depth += 1
elif tag in ('br', 'hr'):
self.result.append('\n')
elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
self.result.append('\n')handle_endtag method · python · L99-L106 (8 LOC)dex-fetch.py
def handle_endtag(self, tag):
if tag in self.skip_tags:
self.skip_depth -= 1
if self.skip_depth <= 0:
self.current_skip = False
self.skip_depth = 0
elif tag in ('p', 'div', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'tr', 'section', 'article'):
self.result.append('\n')handle_data method · python · L108-L112 (5 LOC)dex-fetch.py
def handle_data(self, data):
if not self.current_skip:
text = data.strip()
if text:
self.result.append(text)get_text method · python · L114-L119 (6 LOC)dex-fetch.py
def get_text(self):
raw = ' '.join(self.result)
# Clean up whitespace
raw = re.sub(r'\n\s*\n', '\n\n', raw)
raw = re.sub(r' +', ' ', raw)
return raw.strip()strip_html function · python · L121-L124 (4 LOC)dex-fetch.py
def strip_html(html_content):
stripper = HTMLStripper()
stripper.feed(html_content)
return stripper.get_text()Source: Repobility analyzer · https://repobility.com
fetch_page function · python · L129-L144 (16 LOC)dex-fetch.py
def fetch_page(url, timeout=30):
try:
headers = {
'User-Agent': 'DexJr-Fetch/1.0 (DDL Local AI; +https://dropdownlogistics.com)',
}
r = requests.get(url, headers=headers, timeout=timeout)
r.raise_for_status()
return {
"html": r.text,
"status": r.status_code,
"content_type": r.headers.get('content-type', ''),
"size": len(r.text),
"error": None,
}
except Exception as e:
return {"html": None, "status": 0, "content_type": "", "size": 0, "error": str(e)}retrieve_context function · python · L149-L177 (29 LOC)dex-fetch.py
def retrieve_context(query, top_k=TOP_K):
try:
import chromadb
client = chromadb.PersistentClient(path=CHROMA_DIR)
collection = client.get_collection(CANON_COLLECTION)
r = requests.post(
OLLAMA_EMBED_URL,
json={"model": EMBED_MODEL, "prompt": query},
timeout=60,
)
r.raise_for_status()
embedding = r.json().get("embedding")
if not embedding:
return ""
results = collection.query(
query_embeddings=[embedding],
n_results=top_k,
include=["documents", "metadatas"],
)
chunks = []
if results and results["documents"]:
for i, doc in enumerate(results["documents"][0]):
meta = results["metadatas"][0][i] if results["metadatas"] else {}
source = meta.get("source_file", "unknown")
chunks.append(f"[Source: {source}]\n{doc[:400]}")
return "\n\n".join(chunkask_dexjr function · python · L182-L214 (33 LOC)dex-fetch.py
def ask_dexjr(page_text, question, rag_context="", model=DEFAULT_MODEL):
prompt_parts = [PAGE_GOVERNANCE]
if rag_context:
prompt_parts.append(f"\nRAG CONTEXT (from DDL corpus):\n{rag_context}")
prompt_parts.append(f"\nPAGE CONTENT:\n{page_text[:12000]}")
prompt_parts.append(f"\nQUESTION:\n{question}")
prompt_parts.append("\nAnalyze the page content and answer the question. Be specific. Cite exact elements from the page.")
prompt = "\n".join(prompt_parts)
try:
start = time.time()
r = requests.post(
OLLAMA_URL,
json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0.3, "num_ctx": 16384},
},
timeout=180,
)
r.raise_for_status()
elapsed = time.time() - start
return {
"response": r.json().get("response", "[No response]"),
"elapsed": round(elcrawl_sitemap function · python · L219-L270 (52 LOC)dex-fetch.py
def crawl_sitemap(base_url, save_dir):
"""Fetch sitemap.xml and crawl all pages."""
os.makedirs(save_dir, exist_ok=True)
sitemap_url = f"{base_url.rstrip('/')}/sitemap.xml"
print(f" Fetching sitemap: {sitemap_url}")
result = fetch_page(sitemap_url)
if result["error"]:
# Try sitemap-0.xml (common for Next.js)
sitemap_url = f"{base_url.rstrip('/')}/sitemap-0.xml"
print(f" Trying: {sitemap_url}")
result = fetch_page(sitemap_url)
if result["error"]:
print(f" ERROR: Could not fetch sitemap: {result['error']}")
print(f" Falling back to known routes...")
return []
# Extract URLs from sitemap XML
urls = re.findall(r'<loc>(.*?)</loc>', result["html"])
print(f" Found {len(urls)} URLs in sitemap")
fetched = []
for i, url in enumerate(urls):
print(f" [{i+1}/{len(urls)}] {url}...", end=" ", flush=True)
page = fetch_page(url)
if page["error"]:
print(fsave_text function · python · L275-L282 (8 LOC)dex-fetch.py
def save_text(text, url, filepath):
os.makedirs(os.path.dirname(filepath), exist_ok=True)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"URL: {url}\n")
f.write(f"FETCHED: {datetime.datetime.now().isoformat()}\n")
f.write(f"SIZE: {len(text)} chars\n\n")
f.write(text)
print(f" Saved to: {filepath}")ingest_text function · python · L284-L296 (13 LOC)dex-fetch.py
def ingest_text(text, url):
"""Save to canon folder for next ingestion sweep."""
os.makedirs(CANON_DIR, exist_ok=True)
slug = url.replace("https://", "").replace("http://", "").replace("/", "_").rstrip("_")
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"WebFetch_{slug}_{ts}.txt"
filepath = os.path.join(CANON_DIR, filename)
with open(filepath, "w", encoding="utf-8") as f:
f.write(f"URL: {url}\n")
f.write(f"FETCHED: {datetime.datetime.now().isoformat()}\n")
f.write(f"SIZE: {len(text)} chars\n\n")
f.write(text)
print(f" Ingested to corpus: {filepath}")log_fetch function · python · L301-L314 (14 LOC)dex-fetch.py
def log_fetch(url, text_length, question, response_length, elapsed):
entry = {
"timestamp": datetime.datetime.now().isoformat(),
"url": url,
"text_chars": text_length,
"question": question[:200] if question else None,
"response_chars": response_length,
"elapsed": elapsed,
}
try:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
except:
passdisplay_result function · python · L319-L342 (24 LOC)dex-fetch.py
def display_result(url, text, question, result, rag_used):
print()
print("=" * 70)
print(f" DEX JR WEB FETCH — Page Analysis")
print("=" * 70)
print(f" URL: {url}")
print(f" Page text: {len(text)} chars")
print(f" RAG: {'Active' if rag_used else 'Off'}")
if question:
print(f" Question: {question[:80]}")
print("=" * 70)
print()
if result and result.get("response"):
print(result["response"])
elif result and result.get("error"):
print(f" ERROR: {result['error']}")
print()
print("=" * 70)
if result:
print(f" Elapsed: {result.get('elapsed', 0)}s")
print(f" Log: {LOG_FILE}")
print("=" * 70)Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
main function · python · L347-L425 (79 LOC)dex-fetch.py
def main():
parser = argparse.ArgumentParser(description="Dex Jr Web Fetch v1.0")
parser.add_argument("url", nargs="?", default=None, help="URL to fetch")
parser.add_argument("--ask", default=None, help="Question to ask about the page")
parser.add_argument("--rag", action="store_true", help="Also retrieve RAG context")
parser.add_argument("--raw", action="store_true", help="Just show stripped text")
parser.add_argument("--save", default=None, help="Save stripped text to file")
parser.add_argument("--ingest", action="store_true", help="Save to corpus for ingestion")
parser.add_argument("--model", default=DEFAULT_MODEL, help="LLM model to use")
parser.add_argument("--sitemap", default=None, help="Crawl sitemap from base URL")
parser.add_argument("--top", type=int, default=TOP_K, help="RAG chunks to retrieve")
args = parser.parse_args()
# Sitemap crawl mode
if args.sitemap:
save_dir = args.save or os.path.join(SCRIPT_DIR, "fetcclassify_tier function · python · L116-L124 (9 LOC)dex-ingest.py
def classify_tier(rel_path: str, filename: str, folder: str) -> Tuple[str, str]:
s = f"{rel_path} {filename} {folder}".lower()
if any(m in s for m in CANON_PATH_MARKERS):
return ("canon", "ratified")
if any(m in s for m in FOUNDATION_PATH_MARKERS):
return ("foundation", "conceptual")
if any(m in s for m in ARCHIVE_PATH_MARKERS):
return ("archive", "historical")
return ("unknown", "unknown")infer_source_type function · python · L127-L154 (28 LOC)dex-ingest.py
def infer_source_type(filename: str, extension: str) -> str:
"""
Infer STD-DDL-METADATA-001 source_type enum value from filename + extension.
Extension-first for code/data/web. Filename-prefix override for text
files that are classifiable by naming convention (council reviews,
governance docs, synthesis, system telemetry). Fallback: "unknown".
"""
ext = (extension or "").lower()
if ext in CODE_EXTENSIONS:
return "code"
if ext == ".csv":
return "spreadsheet"
if ext in (".html", ".mhtml"):
return "web_archive"
# Text-file filename-prefix rules (.md / .txt)
name = filename or ""
if name.startswith("DDLCouncilReview_"):
return "council_review"
if name.startswith("SYNTH-") or "_SYNTH." in name:
return "council_synthesis"
if any(name.startswith(p) for p in ("ADR-", "STD-", "PRO-", "CR-")):
return "governance"
if name.startswith("sweep_") and name.endswith(".md"):
return "s