Function bodies 42 total
bench_file function · python · L14-L39 (26 LOC)benchmarks/run.py
def bench_file(path: Path) -> dict:
text = path.read_text()
start = time.monotonic()
result = decompose_text(text)
elapsed = round((time.monotonic() - start) * 1000, 1)
meta = result["meta"]
units = result["units"]
safety = sum(1 for u in units if u["risk"] == "safety_critical")
mandatory = sum(1 for u in units if u["authority"] == "mandatory")
irreducible = sum(1 for u in units if u["irreducible"])
return {
"file": path.name,
"input_chars": len(text),
"input_words": len(text.split()),
"total_units": meta["total_units"],
"processing_ms": elapsed,
"standards_found": len(meta["standards_found"]),
"dates_found": len(meta["dates_found"]),
"token_estimate": meta["token_estimate"],
"mandatory_units": mandatory,
"safety_critical_units": safety,
"irreducible_units": irreducible,
}main function · python · L42-L75 (34 LOC)benchmarks/run.py
def main():
if not FIXTURES.exists():
print(f"No fixtures at {FIXTURES}", file=sys.stderr)
sys.exit(1)
files = sorted(FIXTURES.glob("*.txt"))
if not files:
print("No .txt fixtures found", file=sys.stderr)
sys.exit(1)
results = []
for f in files:
results.append(bench_file(f))
# Summary
total_chars = sum(r["input_chars"] for r in results)
total_ms = sum(r["processing_ms"] for r in results)
total_units = sum(r["total_units"] for r in results)
total_standards = sum(r["standards_found"] for r in results)
output = {
"benchmarks": results,
"summary": {
"files": len(results),
"total_chars": total_chars,
"total_units": total_units,
"total_standards": total_standards,
"total_ms": round(total_ms, 1),
"chars_per_ms": round(total_chars / max(total_ms, 0.1)),
},
}
json.dump(output, sys.stdout, indent=2)
prsafety_chain function · python · L25-L28 (4 LOC)examples/agent_router.py
def safety_chain(unit: dict):
"""Full analysis + human review flag."""
print(f" [SAFETY] {unit['text'][:70]}")
print(f" entities: {unit.get('entities', [])}")financial_chain function · python · L31-L34 (4 LOC)examples/agent_router.py
def financial_chain(unit: dict):
"""Flag for finance team review."""
print(f" [FINANCIAL] {unit['text'][:70]}")
print(f" financial: {unit.get('financial', [])}")compliance_chain function · python · L37-L39 (3 LOC)examples/agent_router.py
def compliance_chain(unit: dict):
"""Standard compliance check."""
print(f" [COMPLIANCE]{unit['text'][:70]}")general_chain function · python · L42-L44 (3 LOC)examples/agent_router.py
def general_chain(unit: dict):
"""Standard LLM analysis."""
print(f" [GENERAL] {unit['text'][:70]}")main function · python · L47-L80 (34 LOC)examples/agent_router.py
def main():
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
text = f.read()
else:
text = SAMPLE_TEXT
result = decompose_text(text)
units = result["units"]
routed = {"safety": 0, "financial": 0, "compliance": 0, "general": 0, "skipped": 0}
for unit in units:
if unit["risk"] == "safety_critical":
safety_chain(unit)
routed["safety"] += 1
elif unit["risk"] == "financial":
financial_chain(unit)
routed["financial"] += 1
elif unit["risk"] == "compliance":
compliance_chain(unit)
routed["compliance"] += 1
elif unit["attention"] < 0.5:
print(f" [SKIP] {unit['text'][:70]}")
routed["skipped"] += 1
else:
general_chain(unit)
routed["general"] += 1
print(f"\n--- Routing Summary ---")
for chain, count in routed.items():
if count > 0:
print(f" {chain:12Open data scored by Repobility · https://repobility.com
main function · python · L25-L72 (48 LOC)examples/compliance_audit.py
def main():
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
text = f.read()
else:
text = SAMPLE_TEXT
result = decompose_text(text)
units = result["units"]
print(f"=== Compliance Audit Trail ===")
print(f"Input: {len(text)} characters")
print(f"Units: {len(units)}")
print()
for i, unit in enumerate(units, 1):
print(f"--- Unit {i} ---")
print(f" Text: {unit['text'][:80]}")
print(f" Authority: {unit['authority']}")
print(f" Risk: {unit['risk']}")
print(f" Attention: {unit['attention']}")
print(f" Irreducible: {unit.get('irreducible', False)}")
print(f" Actionable: {unit.get('actionable', False)}")
entities = unit.get("entities", [])
if entities:
print(f" Entities: {', '.join(entities)}")
financial = unit.get("financial", [])
if financial:
print(f" Financial: {', '.joinmain function · python · L36-L73 (38 LOC)examples/cost_calculator.py
def main():
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
text = f.read()
else:
text = SAMPLE_TEXT
result = decompose_text(text)
units = result["units"]
total_chars = sum(len(u["text"]) for u in units)
total_tokens = estimate_tokens(text)
# With Decompose: only send high-attention units to the LLM
high_attention = [u for u in units if u["attention"] >= 1.0]
filtered_chars = sum(len(u["text"]) for u in high_attention)
filtered_tokens = filtered_chars // CHARS_PER_TOKEN
reduction_pct = 100 - (filtered_tokens * 100 // total_tokens) if total_tokens > 0 else 0
print(f"--- Before Decompose (send everything to LLM) ---")
print(f" Units: {len(units)}")
print(f" Characters: {total_chars:,}")
print(f" Est tokens: {total_tokens:,}")
print(f"\n--- After Decompose (send attention >= 1.0 only) ---")
print(f" Units: {len(high_attention)}")
print(f" Characters: {filtered_chars:mock_embed function · python · L27-L29 (3 LOC)examples/rag_pipeline.py
def mock_embed(text: str) -> list[float]:
"""Stand-in for your real embedding function."""
return [0.0] * 8 # Replace with your embedding modelmain function · python · L32-L59 (28 LOC)examples/rag_pipeline.py
def main():
# Load text from file or use the sample
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
text = f.read()
else:
text = SAMPLE_TEXT
result = decompose_text(text)
units = result["units"]
embedded = []
skipped = []
for unit in units:
if unit["attention"] >= ATTENTION_THRESHOLD:
embedding = mock_embed(unit["text"])
embedded.append(unit)
print(f" EMBED [{unit['attention']:4.1f}] [{unit['authority']:12s}] {unit['text'][:70]}")
else:
skipped.append(unit)
print(f" SKIP [{unit['attention']:4.1f}] [{unit['authority']:12s}] {unit['text'][:70]}")
print(f"\n--- Results ---")
print(f"Total units: {len(units)}")
print(f"Embedded: {len(embedded)} (attention >= {ATTENTION_THRESHOLD})")
print(f"Skipped: {len(skipped)}")
print(f"Token reduction: ~{len(skipped) * 100 // len(units)}%")read_file function · python · L17-L29 (13 LOC)lab/run.py
def read_file(path: Path) -> str:
"""Read text from any supported file format."""
if path.suffix.lower() == ".pdf":
try:
import fitz # PyMuPDF
doc = fitz.open(str(path))
text = "\n\n".join(page.get_text() for page in doc)
doc.close()
return text
except ImportError:
return ""
return path.read_text(errors="replace")process_file function · python · L32-L102 (71 LOC)lab/run.py
def process_file(path: Path) -> dict:
"""Process a single file and return detailed results."""
text = read_file(path)
if not text.strip():
return {"file": path.name, "error": "empty"}
# Timing (100 iterations for stable measurement)
start = time.monotonic()
iterations = 100
for _ in range(iterations):
result = decompose_text(text)
avg_ms = round((time.monotonic() - start) * 1000 / iterations, 2)
units = result["units"]
meta = result["meta"]
# Unit breakdown
authority_dist = {}
risk_dist = {}
type_dist = {}
high_attention = []
irreducible_units = []
for i, u in enumerate(units):
auth = u["authority"]
authority_dist[auth] = authority_dist.get(auth, 0) + 1
risk = u.get("risk", "informational")
risk_dist[risk] = risk_dist.get(risk, 0) + 1
ctype = u.get("type", "narrative")
type_dist[ctype] = type_dist.get(ctype, 0) + 1
if u["attention"] >= 3.0:
print_report function · python · L105-L161 (57 LOC)lab/run.py
def print_report(results: list[dict]):
"""Print a human-readable report to stdout."""
print("=" * 70)
print("DECOMPOSE TESTING LAB — REPORT")
print("=" * 70)
total_chars = 0
total_words = 0
total_units = 0
total_ms = 0.0
for r in results:
if "error" in r:
print(f"\n {r['file']}: SKIPPED ({r['error']})")
continue
total_chars += r["chars"]
total_words += r["words"]
total_units += r["total_units"]
total_ms += r["avg_ms"]
print(f"\n{'─' * 70}")
print(f" {r['file']}")
print(f" {r['chars']:,} chars | {r['words']:,} words | {r['avg_ms']}ms")
print(f" {r['total_units']} units | "
f"{r['summary']['actionable']} actionable | "
f"{r['summary']['irreducible']} irreducible | "
f"max attention: {r['summary']['max_attention']}")
print(f"\n Authority: {r['authority_distribution']}")
print(f" Risk: {main function · python · L164-L255 (92 LOC)lab/run.py
def main():
# Find test documents
test_dir = Path(__file__).resolve().parent.parent / "tests" / "fixtures"
if not test_dir.exists():
print(f"No test directory at {test_dir}", file=sys.stderr)
sys.exit(1)
# Collect all text files recursively
files = sorted(
p for p in test_dir.rglob("*")
if p.suffix in (".md", ".txt", ".rst", ".pdf") and p.is_file()
)
if not files:
print(f"No documents found in {test_dir}", file=sys.stderr)
sys.exit(1)
print(f"Found {len(files)} documents in {test_dir}\n")
results = []
for f in files:
results.append(process_file(f))
# Print human-readable report
print_report(results)
# Save detailed results
root = Path(__file__).resolve().parent.parent
out_path = root / "lab" / "results.json"
with open(out_path, "w") as fp:
json.dump(results, fp, indent=2)
print(f"\nFull results saved to {out_path}")
# Build site-facing benchmarRepobility — same analyzer, your code, free for public repos · /scan/
Chunk class · python · L13-L22 (10 LOC)src/decompose/chunker.py
class Chunk:
chunk_id: int
text: str
start: int
end: int
word_count: int
char_count: int
heading: str | None = None
heading_level: int = 0
heading_path: list[str] = field(default_factory=list)chunk_text function · python · L25-L66 (42 LOC)src/decompose/chunker.py
def chunk_text(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
"""Split text into overlapping chunks, breaking at sentence boundaries."""
text = text.replace("\u00a0", " ")
if not text.strip():
return []
if len(text) <= chunk_size:
stripped = text.strip()
return [Chunk(
chunk_id=1, text=stripped, start=0, end=len(text),
word_count=len(stripped.split()), char_count=len(text),
)]
chunks: list[Chunk] = []
start = 0
cid = 1
while start < len(text):
end = min(start + chunk_size, len(text))
# Find a sentence boundary to break at
if end < len(text):
window = text[max(end - 150, start) : end]
for sep in (". ", ".\n", "! ", "? ", "\n\n"):
idx = window.rfind(sep)
if idx > -1:
end = max(end - 150, start) + idx + len(sep)
break
chun_parse_markdown_sections function · python · L69-L106 (38 LOC)src/decompose/chunker.py
def _parse_markdown_sections(text: str) -> list[dict]:
"""Parse markdown into sections delimited by ATX headers."""
header_re = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
matches = list(header_re.finditer(text))
if not matches:
return [{"heading": None, "level": 0, "parent": None, "path": [],
"text": text, "start": 0, "end": len(text)}]
sections = []
stack: list[tuple[int, str]] = []
# Preamble before first header
if matches[0].start() > 0:
pre = text[: matches[0].start()]
if pre.strip():
sections.append({"heading": None, "level": 0, "parent": None, "path": [],
"text": pre, "start": 0, "end": matches[0].start()})
for i, m in enumerate(matches):
level = len(m.group(1))
heading = m.group(2).strip()
sec_start = m.start()
sec_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
while stack and stack[-1][0] >= lchunk_markdown function · python · L109-L146 (38 LOC)src/decompose/chunker.py
def chunk_markdown(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
"""Split markdown by header boundaries, sub-chunking oversized sections."""
if not text.strip():
return []
sections = _parse_markdown_sections(text)
if len(sections) == 1 and sections[0]["level"] == 0:
return chunk_text(text, chunk_size, overlap)
chunks: list[Chunk] = []
cid = 1
for sec in sections:
sec_text = sec["text"].strip()
if not sec_text:
continue
if len(sec_text) <= chunk_size:
chunks.append(Chunk(
chunk_id=cid, text=sec_text, start=sec["start"], end=sec["end"],
word_count=len(sec_text.split()), char_count=len(sec_text),
heading=sec["heading"], heading_level=sec["level"], heading_path=sec["path"],
))
cid += 1
else:
sub = chunk_text(sec_text, chunk_size, overlap)
forauto_chunk function · python · L149-L153 (5 LOC)src/decompose/chunker.py
def auto_chunk(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
"""Auto-detect format and chunk accordingly."""
if re.search(r"^#{1,6}\s+", text, re.MULTILINE):
return chunk_markdown(text, chunk_size, overlap)
return chunk_text(text, chunk_size, overlap)Classification class · python · L108-L115 (8 LOC)src/decompose/classifier.py
class Classification:
authority: str = "informational"
authority_score: float = 0.0
risk: str = "informational"
risk_score: float = 0.0
content_type: str = "narrative"
actionable: bool = False
attention: float = 0.0_score_patterns function · python · L118-L136 (19 LOC)src/decompose/classifier.py
def _score_patterns(text: str, patterns: dict[str, dict | list[str]], use_weight: bool = False) -> tuple[str, float]:
"""Score text against a pattern dict. Returns (top_label, score)."""
scores: dict[str, float] = {}
text_lower = text.lower() if len(text) < 50_000 else text[:50_000].lower()
for label, value in patterns.items():
pats = value["patterns"] if isinstance(value, dict) else value
weight = value.get("weight", 1.0) if isinstance(value, dict) else 1.0
count = 0
for p in pats:
count += len(re.findall(p, text_lower, re.IGNORECASE))
if count > 0:
scores[label] = count * weight
if not scores:
return ("informational", 0.0)
top = max(scores, key=scores.get) # type: ignore[arg-type]
return (top, scores[top])classify function · python · L139-L166 (28 LOC)src/decompose/classifier.py
def classify(text: str) -> Classification:
"""Classify a text passage. Pure regex, no LLM, deterministic."""
authority, auth_score = _score_patterns(text, AUTHORITY_PATTERNS, use_weight=True)
risk, risk_score = _score_patterns(text, RISK_PATTERNS)
content_type, _ = _score_patterns(text, CONTENT_TYPE_PATTERNS)
# Attention score: risk multiplier * normalized authority score
risk_mult = {
"safety_critical": 4.0, "security": 3.0, "compliance": 2.0,
"financial": 1.5, "contractual": 1.5, "advisory": 0.5,
"informational": 0.3,
}.get(risk, 0.5)
attention = min(10.0, round(min(auth_score, 5.0) * risk_mult, 1))
actionable = (
authority in ("mandatory", "prohibitive", "directive")
or risk in ("safety_critical", "security", "compliance")
)
return Classification(
authority=authority,
authority_score=round(min(auth_score, 10.0), 2),
risk=risk,
risk_score=round(min(risk_score, 10.0),If a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L13-L51 (39 LOC)src/decompose/cli.py
def main():
parser = argparse.ArgumentParser(
prog="decompose",
description="Stop prompting. Start decomposing. Structured intelligence from any text.",
)
parser.add_argument("--text", "-t", help="Text to decompose (or pipe via stdin)")
parser.add_argument("--compact", "-c", action="store_true", help="Compact output (omit zero-value fields)")
parser.add_argument("--chunk-size", type=int, default=2000, help="Max characters per unit (default: 2000)")
parser.add_argument("--pretty", "-p", action="store_true", help="Pretty-print JSON output")
parser.add_argument("--serve", action="store_true", help="Run as MCP server (stdio)")
parser.add_argument("--version", "-v", action="store_true", help="Print version")
args = parser.parse_args()
if args.version:
from decompose import __version__
print(f"decompose {__version__}")
return
if args.serve:
from decompose.mcp_server import serve
asyncio.runUnit class · python · L16-L28 (13 LOC)src/decompose/core.py
class Unit:
"""A single semantic unit — the atomic output of decompose."""
text: str
authority: str
risk: str
content_type: str
irreducible: bool
attention: float
actionable: bool
entities: list[str]
heading: str | None = None
heading_path: list[str] = field(default_factory=list)DecomposeResult class · python · L32-L36 (5 LOC)src/decompose/core.py
class DecomposeResult:
"""Complete decompose output."""
units: list[Unit]
meta: dict = field(default_factory=dict)decompose_text function · python · L39-L148 (110 LOC)src/decompose/core.py
def decompose_text(
text: str,
*,
chunk_size: int = 2000,
overlap: int = 200,
compact: bool = False,
) -> dict:
"""Decompose text into classified semantic units.
Args:
text: Raw input text.
chunk_size: Maximum characters per chunk.
overlap: Character overlap between chunks.
compact: If True, omit zero-value fields for smaller output.
Returns:
Dictionary with 'units' list and 'meta' summary.
"""
start = time.monotonic()
if not text or not text.strip():
return {"units": [], "meta": {"total_units": 0, "error": "empty_input"}}
MAX_INPUT = 10_000_000 # 10 MB
if len(text) > MAX_INPUT:
return {"units": [], "meta": {"total_units": 0, "error": "input_too_large", "max_bytes": MAX_INPUT}}
chunk_size = max(100, min(chunk_size, 100_000))
overlap = max(0, min(overlap, chunk_size // 2))
# Chunk
chunks = auto_chunk(text, chunk_size=chunk_size, overlap=overlap)
# Classfilter_for_llm function · python · L155-L235 (81 LOC)src/decompose/core.py
def filter_for_llm(
result: dict,
*,
authorities: tuple[str, ...] = ("mandatory", "prohibitive", "directive", "conditional"),
risks: tuple[str, ...] = ("safety_critical", "compliance", "financial", "contractual"),
types: tuple[str, ...] = ("requirement", "constraint", "data", "definition"),
min_attention: float = 0.0,
include_headings: bool = True,
max_tokens: int = 0,
) -> dict:
"""Filter decompose result to high-value units for LLM consumption.
Takes the output of decompose_text() and returns only units that
match the specified authority, risk, or content type criteria.
Pattern proven in RBS Policy QC: Decompose as pre-filter before LLM
extraction dramatically improves quality and reduces hallucination.
Args:
result: Output from decompose_text().
authorities: Authority levels to keep.
risks: Risk levels to keep.
types: Content types to keep.
min_attention: Minimum attention score (0.0 = Entities class · python · L41-L45 (5 LOC)src/decompose/entities.py
class Entities:
standards: list[str] = field(default_factory=list)
dates: list[str] = field(default_factory=list)
financial: list[str] = field(default_factory=list)
references: list[str] = field(default_factory=list)extract_entities function · python · L48-L82 (35 LOC)src/decompose/entities.py
def extract_entities(text: str) -> Entities:
"""Extract structured entities from text. Pure regex, deterministic."""
standards: list[str] = []
dates: list[str] = []
financial: list[str] = []
references: list[str] = []
# Standards
for rx in (_STANDARD_US, _STANDARD_INTL, _BUILDING_CODE, _OSHA):
for m in rx.finditer(text):
standards.append(m.group(0).strip())
# CFR references
for m in _CFR.finditer(text):
references.append(m.group(0).strip())
# Dates
for m in _DATE_MDY.finditer(text):
dates.append(m.group(0))
for m in _DATE_WRITTEN.finditer(text):
dates.append(m.group(0))
# Financial
for m in _DOLLAR.finditer(text):
financial.append(f"${m.group(1)}")
for m in _PERCENT.finditer(text):
financial.append(f"{m.group(1)}%")
# Deduplicate preserving order
return Entities(
standards=list(dict.fromkeys(standards)),
dates=list(dict.fromkeys(dates)),
IrreducibilityResult class · python · L23-L28 (6 LOC)src/decompose/irreducibility.py
class IrreducibilityResult:
irreducible: bool
confidence: float
recommendation: str
categories: list[str] = field(default_factory=list)
match_count: int = 0Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
detect_irreducibility function · python · L31-L57 (27 LOC)src/decompose/irreducibility.py
def detect_irreducibility(text: str) -> IrreducibilityResult:
"""Determine if text content is computationally irreducible."""
matches = []
categories_seen: set[str] = set()
for pattern, category in IRREDUCIBLE_PATTERNS:
for m in re.finditer(pattern, text, re.IGNORECASE):
matches.append(m.group(0)[:80])
categories_seen.add(category)
count = len(matches)
confidence = min(1.0, count * 0.2)
if confidence >= 0.6:
rec = "PRESERVE_VERBATIM"
elif confidence >= 0.3:
rec = "PRESERVE_KEY_VALUES"
else:
rec = "SUMMARIZABLE"
return IrreducibilityResult(
irreducible=count > 0,
confidence=round(confidence, 3),
recommendation=rec,
categories=sorted(categories_seen),
match_count=count,
)_validate_url function · python · L33-L49 (17 LOC)src/decompose/mcp_server.py
def _validate_url(url: str) -> None:
"""Reject URLs targeting internal/private networks or non-HTTP schemes."""
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
raise ValueError(f"Unsupported URL scheme: {parsed.scheme!r}")
hostname = parsed.hostname
if not hostname:
raise ValueError("URL has no hostname")
try:
addrs = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
except socket.gaierror as e:
raise ValueError(f"Cannot resolve hostname: {e}") from None
for _, _, _, _, sockaddr in addrs:
ip = ipaddress.ip_address(sockaddr[0])
for net in _BLOCKED_NETS:
if ip in net:
raise ValueError(f"URL resolves to blocked address: {ip}")_HTMLToText class · python · L54-L76 (23 LOC)src/decompose/mcp_server.py
class _HTMLToText(HTMLParser):
"""Minimal HTML-to-text converter. No dependencies."""
def __init__(self):
super().__init__()
self._parts: list[str] = []
self._skip = False
def handle_starttag(self, tag, attrs):
self._skip = tag in ("script", "style", "nav", "footer", "header")
if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "h5", "h6", "li", "tr"):
self._parts.append("\n")
def handle_endtag(self, tag):
if tag in ("script", "style", "nav", "footer", "header"):
self._skip = False
def handle_data(self, data):
if not self._skip:
self._parts.append(data)
def get_text(self) -> str:
return "".join(self._parts).strip()__init__ method · python · L57-L60 (4 LOC)src/decompose/mcp_server.py
def __init__(self):
super().__init__()
self._parts: list[str] = []
self._skip = Falsehandle_starttag method · python · L62-L65 (4 LOC)src/decompose/mcp_server.py
def handle_starttag(self, tag, attrs):
self._skip = tag in ("script", "style", "nav", "footer", "header")
if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "h5", "h6", "li", "tr"):
self._parts.append("\n")handle_endtag method · python · L67-L69 (3 LOC)src/decompose/mcp_server.py
def handle_endtag(self, tag):
if tag in ("script", "style", "nav", "footer", "header"):
self._skip = Falsehandle_data method · python · L71-L73 (3 LOC)src/decompose/mcp_server.py
def handle_data(self, data):
if not self._skip:
self._parts.append(data)_fetch_url function · python · L79-L96 (18 LOC)src/decompose/mcp_server.py
def _fetch_url(url: str, timeout: int = 15) -> str:
"""Fetch URL content, convert HTML to plain text. Stdlib only."""
_validate_url(url)
req = urllib.request.Request(
url,
headers={"User-Agent": "decompose/0.1", "Accept": "text/markdown, text/plain, text/html"},
)
with urllib.request.urlopen(req, timeout=timeout) as resp:
content_type = resp.headers.get("Content-Type", "")
body = resp.read().decode("utf-8", errors="replace")
if "text/markdown" in content_type or "text/plain" in content_type:
return body
# HTML → plain text
parser = _HTMLToText()
parser.feed(body)
return parser.get_text()Open data scored by Repobility · https://repobility.com
list_tools function · python · L100-L137 (38 LOC)src/decompose/mcp_server.py
async def list_tools() -> list[Tool]:
return [
Tool(
name="decompose_text",
description=(
"Decompose text into classified semantic units with authority levels, "
"risk scores, entity extraction, and irreducibility flags. "
"No LLM required. Deterministic. Returns structured JSON."
),
inputSchema={
"type": "object",
"properties": {
"text": {"type": "string", "description": "The text to decompose"},
"compact": {"type": "boolean", "description": "Omit zero-value fields", "default": False},
"chunk_size": {
"type": "integer", "description": "Max chars per unit (100-100000)",
"default": 2000, "minimum": 100, "maximum": 100000,
},
},
"required": ["text"],
},
),
Tool(
call_tool function · python · L141-L161 (21 LOC)src/decompose/mcp_server.py
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name == "decompose_text":
result = decompose_text(
arguments["text"],
compact=arguments.get("compact", False),
chunk_size=arguments.get("chunk_size", 2000),
)
return [TextContent(type="text", text=json.dumps(result, indent=2))]
elif name == "decompose_url":
url = arguments["url"]
try:
text = _fetch_url(url)
except (urllib.error.URLError, TimeoutError, OSError, ValueError) as e:
return [TextContent(type="text", text=json.dumps({"error": f"Failed to fetch URL: {e}"}))]
result = decompose_text(text, compact=arguments.get("compact", False))
result["meta"]["source_url"] = url
return [TextContent(type="text", text=json.dumps(result, indent=2))]
return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}))]serve function · python · L164-L167 (4 LOC)src/decompose/mcp_server.py
async def serve():
"""Run the MCP server on stdio."""
async with stdio_server() as (read, write):
await server.run(read, write, server.create_initialization_options())