← back to echology-io__decompose

Function bodies 42 total

All specs Real LLM only Function bodies
bench_file function · python · L14-L39 (26 LOC)
benchmarks/run.py
def bench_file(path: Path) -> dict:
    text = path.read_text()
    start = time.monotonic()
    result = decompose_text(text)
    elapsed = round((time.monotonic() - start) * 1000, 1)

    meta = result["meta"]
    units = result["units"]

    safety = sum(1 for u in units if u["risk"] == "safety_critical")
    mandatory = sum(1 for u in units if u["authority"] == "mandatory")
    irreducible = sum(1 for u in units if u["irreducible"])

    return {
        "file": path.name,
        "input_chars": len(text),
        "input_words": len(text.split()),
        "total_units": meta["total_units"],
        "processing_ms": elapsed,
        "standards_found": len(meta["standards_found"]),
        "dates_found": len(meta["dates_found"]),
        "token_estimate": meta["token_estimate"],
        "mandatory_units": mandatory,
        "safety_critical_units": safety,
        "irreducible_units": irreducible,
    }
main function · python · L42-L75 (34 LOC)
benchmarks/run.py
def main():
    if not FIXTURES.exists():
        print(f"No fixtures at {FIXTURES}", file=sys.stderr)
        sys.exit(1)

    files = sorted(FIXTURES.glob("*.txt"))
    if not files:
        print("No .txt fixtures found", file=sys.stderr)
        sys.exit(1)

    results = []
    for f in files:
        results.append(bench_file(f))

    # Summary
    total_chars = sum(r["input_chars"] for r in results)
    total_ms = sum(r["processing_ms"] for r in results)
    total_units = sum(r["total_units"] for r in results)
    total_standards = sum(r["standards_found"] for r in results)

    output = {
        "benchmarks": results,
        "summary": {
            "files": len(results),
            "total_chars": total_chars,
            "total_units": total_units,
            "total_standards": total_standards,
            "total_ms": round(total_ms, 1),
            "chars_per_ms": round(total_chars / max(total_ms, 0.1)),
        },
    }

    json.dump(output, sys.stdout, indent=2)
    pr
safety_chain function · python · L25-L28 (4 LOC)
examples/agent_router.py
def safety_chain(unit: dict):
    """Full analysis + human review flag."""
    print(f"  [SAFETY]    {unit['text'][:70]}")
    print(f"              entities: {unit.get('entities', [])}")
financial_chain function · python · L31-L34 (4 LOC)
examples/agent_router.py
def financial_chain(unit: dict):
    """Flag for finance team review."""
    print(f"  [FINANCIAL] {unit['text'][:70]}")
    print(f"              financial: {unit.get('financial', [])}")
compliance_chain function · python · L37-L39 (3 LOC)
examples/agent_router.py
def compliance_chain(unit: dict):
    """Standard compliance check."""
    print(f"  [COMPLIANCE]{unit['text'][:70]}")
general_chain function · python · L42-L44 (3 LOC)
examples/agent_router.py
def general_chain(unit: dict):
    """Standard LLM analysis."""
    print(f"  [GENERAL]   {unit['text'][:70]}")
main function · python · L47-L80 (34 LOC)
examples/agent_router.py
def main():
    if len(sys.argv) > 1:
        with open(sys.argv[1]) as f:
            text = f.read()
    else:
        text = SAMPLE_TEXT

    result = decompose_text(text)
    units = result["units"]

    routed = {"safety": 0, "financial": 0, "compliance": 0, "general": 0, "skipped": 0}

    for unit in units:
        if unit["risk"] == "safety_critical":
            safety_chain(unit)
            routed["safety"] += 1
        elif unit["risk"] == "financial":
            financial_chain(unit)
            routed["financial"] += 1
        elif unit["risk"] == "compliance":
            compliance_chain(unit)
            routed["compliance"] += 1
        elif unit["attention"] < 0.5:
            print(f"  [SKIP]      {unit['text'][:70]}")
            routed["skipped"] += 1
        else:
            general_chain(unit)
            routed["general"] += 1

    print(f"\n--- Routing Summary ---")
    for chain, count in routed.items():
        if count > 0:
            print(f"  {chain:12
Open data scored by Repobility · https://repobility.com
main function · python · L25-L72 (48 LOC)
examples/compliance_audit.py
def main():
    if len(sys.argv) > 1:
        with open(sys.argv[1]) as f:
            text = f.read()
    else:
        text = SAMPLE_TEXT

    result = decompose_text(text)
    units = result["units"]

    print(f"=== Compliance Audit Trail ===")
    print(f"Input: {len(text)} characters")
    print(f"Units: {len(units)}")
    print()

    for i, unit in enumerate(units, 1):
        print(f"--- Unit {i} ---")
        print(f"  Text:         {unit['text'][:80]}")
        print(f"  Authority:    {unit['authority']}")
        print(f"  Risk:         {unit['risk']}")
        print(f"  Attention:    {unit['attention']}")
        print(f"  Irreducible:  {unit.get('irreducible', False)}")
        print(f"  Actionable:   {unit.get('actionable', False)}")

        entities = unit.get("entities", [])
        if entities:
            print(f"  Entities:     {', '.join(entities)}")

        financial = unit.get("financial", [])
        if financial:
            print(f"  Financial:    {', '.join
main function · python · L36-L73 (38 LOC)
examples/cost_calculator.py
def main():
    if len(sys.argv) > 1:
        with open(sys.argv[1]) as f:
            text = f.read()
    else:
        text = SAMPLE_TEXT

    result = decompose_text(text)
    units = result["units"]

    total_chars = sum(len(u["text"]) for u in units)
    total_tokens = estimate_tokens(text)

    # With Decompose: only send high-attention units to the LLM
    high_attention = [u for u in units if u["attention"] >= 1.0]
    filtered_chars = sum(len(u["text"]) for u in high_attention)
    filtered_tokens = filtered_chars // CHARS_PER_TOKEN

    reduction_pct = 100 - (filtered_tokens * 100 // total_tokens) if total_tokens > 0 else 0

    print(f"--- Before Decompose (send everything to LLM) ---")
    print(f"  Units:      {len(units)}")
    print(f"  Characters: {total_chars:,}")
    print(f"  Est tokens: {total_tokens:,}")

    print(f"\n--- After Decompose (send attention >= 1.0 only) ---")
    print(f"  Units:      {len(high_attention)}")
    print(f"  Characters: {filtered_chars:
mock_embed function · python · L27-L29 (3 LOC)
examples/rag_pipeline.py
def mock_embed(text: str) -> list[float]:
    """Stand-in for your real embedding function."""
    return [0.0] * 8  # Replace with your embedding model
main function · python · L32-L59 (28 LOC)
examples/rag_pipeline.py
def main():
    # Load text from file or use the sample
    if len(sys.argv) > 1:
        with open(sys.argv[1]) as f:
            text = f.read()
    else:
        text = SAMPLE_TEXT

    result = decompose_text(text)
    units = result["units"]

    embedded = []
    skipped = []

    for unit in units:
        if unit["attention"] >= ATTENTION_THRESHOLD:
            embedding = mock_embed(unit["text"])
            embedded.append(unit)
            print(f"  EMBED  [{unit['attention']:4.1f}] [{unit['authority']:12s}] {unit['text'][:70]}")
        else:
            skipped.append(unit)
            print(f"  SKIP   [{unit['attention']:4.1f}] [{unit['authority']:12s}] {unit['text'][:70]}")

    print(f"\n--- Results ---")
    print(f"Total units:    {len(units)}")
    print(f"Embedded:       {len(embedded)} (attention >= {ATTENTION_THRESHOLD})")
    print(f"Skipped:        {len(skipped)}")
    print(f"Token reduction: ~{len(skipped) * 100 // len(units)}%")
read_file function · python · L17-L29 (13 LOC)
lab/run.py
def read_file(path: Path) -> str:
    """Read text from any supported file format."""
    if path.suffix.lower() == ".pdf":
        try:
            import fitz  # PyMuPDF

            doc = fitz.open(str(path))
            text = "\n\n".join(page.get_text() for page in doc)
            doc.close()
            return text
        except ImportError:
            return ""
    return path.read_text(errors="replace")
process_file function · python · L32-L102 (71 LOC)
lab/run.py
def process_file(path: Path) -> dict:
    """Process a single file and return detailed results."""
    text = read_file(path)
    if not text.strip():
        return {"file": path.name, "error": "empty"}

    # Timing (100 iterations for stable measurement)
    start = time.monotonic()
    iterations = 100
    for _ in range(iterations):
        result = decompose_text(text)
    avg_ms = round((time.monotonic() - start) * 1000 / iterations, 2)

    units = result["units"]
    meta = result["meta"]

    # Unit breakdown
    authority_dist = {}
    risk_dist = {}
    type_dist = {}
    high_attention = []
    irreducible_units = []

    for i, u in enumerate(units):
        auth = u["authority"]
        authority_dist[auth] = authority_dist.get(auth, 0) + 1

        risk = u.get("risk", "informational")
        risk_dist[risk] = risk_dist.get(risk, 0) + 1

        ctype = u.get("type", "narrative")
        type_dist[ctype] = type_dist.get(ctype, 0) + 1

        if u["attention"] >= 3.0:
print_report function · python · L105-L161 (57 LOC)
lab/run.py
def print_report(results: list[dict]):
    """Print a human-readable report to stdout."""
    print("=" * 70)
    print("DECOMPOSE TESTING LAB — REPORT")
    print("=" * 70)

    total_chars = 0
    total_words = 0
    total_units = 0
    total_ms = 0.0

    for r in results:
        if "error" in r:
            print(f"\n  {r['file']}: SKIPPED ({r['error']})")
            continue

        total_chars += r["chars"]
        total_words += r["words"]
        total_units += r["total_units"]
        total_ms += r["avg_ms"]

        print(f"\n{'─' * 70}")
        print(f"  {r['file']}")
        print(f"  {r['chars']:,} chars | {r['words']:,} words | {r['avg_ms']}ms")
        print(f"  {r['total_units']} units | "
              f"{r['summary']['actionable']} actionable | "
              f"{r['summary']['irreducible']} irreducible | "
              f"max attention: {r['summary']['max_attention']}")

        print(f"\n  Authority: {r['authority_distribution']}")
        print(f"  Risk:      {
main function · python · L164-L255 (92 LOC)
lab/run.py
def main():
    # Find test documents
    test_dir = Path(__file__).resolve().parent.parent / "tests" / "fixtures"

    if not test_dir.exists():
        print(f"No test directory at {test_dir}", file=sys.stderr)
        sys.exit(1)

    # Collect all text files recursively
    files = sorted(
        p for p in test_dir.rglob("*")
        if p.suffix in (".md", ".txt", ".rst", ".pdf") and p.is_file()
    )

    if not files:
        print(f"No documents found in {test_dir}", file=sys.stderr)
        sys.exit(1)

    print(f"Found {len(files)} documents in {test_dir}\n")

    results = []
    for f in files:
        results.append(process_file(f))

    # Print human-readable report
    print_report(results)

    # Save detailed results
    root = Path(__file__).resolve().parent.parent
    out_path = root / "lab" / "results.json"
    with open(out_path, "w") as fp:
        json.dump(results, fp, indent=2)
    print(f"\nFull results saved to {out_path}")

    # Build site-facing benchmar
Repobility — same analyzer, your code, free for public repos · /scan/
Chunk class · python · L13-L22 (10 LOC)
src/decompose/chunker.py
class Chunk:
    chunk_id: int
    text: str
    start: int
    end: int
    word_count: int
    char_count: int
    heading: str | None = None
    heading_level: int = 0
    heading_path: list[str] = field(default_factory=list)
chunk_text function · python · L25-L66 (42 LOC)
src/decompose/chunker.py
def chunk_text(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
    """Split text into overlapping chunks, breaking at sentence boundaries."""
    text = text.replace("\u00a0", " ")
    if not text.strip():
        return []

    if len(text) <= chunk_size:
        stripped = text.strip()
        return [Chunk(
            chunk_id=1, text=stripped, start=0, end=len(text),
            word_count=len(stripped.split()), char_count=len(text),
        )]

    chunks: list[Chunk] = []
    start = 0
    cid = 1

    while start < len(text):
        end = min(start + chunk_size, len(text))

        # Find a sentence boundary to break at
        if end < len(text):
            window = text[max(end - 150, start) : end]
            for sep in (". ", ".\n", "! ", "? ", "\n\n"):
                idx = window.rfind(sep)
                if idx > -1:
                    end = max(end - 150, start) + idx + len(sep)
                    break

        chun
_parse_markdown_sections function · python · L69-L106 (38 LOC)
src/decompose/chunker.py
def _parse_markdown_sections(text: str) -> list[dict]:
    """Parse markdown into sections delimited by ATX headers."""
    header_re = re.compile(r"^(#{1,6})\s+(.+)$", re.MULTILINE)
    matches = list(header_re.finditer(text))

    if not matches:
        return [{"heading": None, "level": 0, "parent": None, "path": [],
                 "text": text, "start": 0, "end": len(text)}]

    sections = []
    stack: list[tuple[int, str]] = []

    # Preamble before first header
    if matches[0].start() > 0:
        pre = text[: matches[0].start()]
        if pre.strip():
            sections.append({"heading": None, "level": 0, "parent": None, "path": [],
                             "text": pre, "start": 0, "end": matches[0].start()})

    for i, m in enumerate(matches):
        level = len(m.group(1))
        heading = m.group(2).strip()
        sec_start = m.start()
        sec_end = matches[i + 1].start() if i + 1 < len(matches) else len(text)

        while stack and stack[-1][0] >= l
chunk_markdown function · python · L109-L146 (38 LOC)
src/decompose/chunker.py
def chunk_markdown(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
    """Split markdown by header boundaries, sub-chunking oversized sections."""
    if not text.strip():
        return []

    sections = _parse_markdown_sections(text)

    if len(sections) == 1 and sections[0]["level"] == 0:
        return chunk_text(text, chunk_size, overlap)

    chunks: list[Chunk] = []
    cid = 1

    for sec in sections:
        sec_text = sec["text"].strip()
        if not sec_text:
            continue

        if len(sec_text) <= chunk_size:
            chunks.append(Chunk(
                chunk_id=cid, text=sec_text, start=sec["start"], end=sec["end"],
                word_count=len(sec_text.split()), char_count=len(sec_text),
                heading=sec["heading"], heading_level=sec["level"], heading_path=sec["path"],
            ))
            cid += 1
        else:
            sub = chunk_text(sec_text, chunk_size, overlap)
            for
auto_chunk function · python · L149-L153 (5 LOC)
src/decompose/chunker.py
def auto_chunk(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE, overlap: int = DEFAULT_OVERLAP) -> list[Chunk]:
    """Auto-detect format and chunk accordingly."""
    if re.search(r"^#{1,6}\s+", text, re.MULTILINE):
        return chunk_markdown(text, chunk_size, overlap)
    return chunk_text(text, chunk_size, overlap)
Classification class · python · L108-L115 (8 LOC)
src/decompose/classifier.py
class Classification:
    authority: str = "informational"
    authority_score: float = 0.0
    risk: str = "informational"
    risk_score: float = 0.0
    content_type: str = "narrative"
    actionable: bool = False
    attention: float = 0.0
_score_patterns function · python · L118-L136 (19 LOC)
src/decompose/classifier.py
def _score_patterns(text: str, patterns: dict[str, dict | list[str]], use_weight: bool = False) -> tuple[str, float]:
    """Score text against a pattern dict. Returns (top_label, score)."""
    scores: dict[str, float] = {}
    text_lower = text.lower() if len(text) < 50_000 else text[:50_000].lower()

    for label, value in patterns.items():
        pats = value["patterns"] if isinstance(value, dict) else value
        weight = value.get("weight", 1.0) if isinstance(value, dict) else 1.0
        count = 0
        for p in pats:
            count += len(re.findall(p, text_lower, re.IGNORECASE))
        if count > 0:
            scores[label] = count * weight

    if not scores:
        return ("informational", 0.0)

    top = max(scores, key=scores.get)  # type: ignore[arg-type]
    return (top, scores[top])
classify function · python · L139-L166 (28 LOC)
src/decompose/classifier.py
def classify(text: str) -> Classification:
    """Classify a text passage. Pure regex, no LLM, deterministic."""
    authority, auth_score = _score_patterns(text, AUTHORITY_PATTERNS, use_weight=True)
    risk, risk_score = _score_patterns(text, RISK_PATTERNS)
    content_type, _ = _score_patterns(text, CONTENT_TYPE_PATTERNS)

    # Attention score: risk multiplier * normalized authority score
    risk_mult = {
        "safety_critical": 4.0, "security": 3.0, "compliance": 2.0,
        "financial": 1.5, "contractual": 1.5, "advisory": 0.5,
        "informational": 0.3,
    }.get(risk, 0.5)
    attention = min(10.0, round(min(auth_score, 5.0) * risk_mult, 1))

    actionable = (
        authority in ("mandatory", "prohibitive", "directive")
        or risk in ("safety_critical", "security", "compliance")
    )

    return Classification(
        authority=authority,
        authority_score=round(min(auth_score, 10.0), 2),
        risk=risk,
        risk_score=round(min(risk_score, 10.0),
If a scraper extracted this row, it came from Repobility (https://repobility.com)
main function · python · L13-L51 (39 LOC)
src/decompose/cli.py
def main():
    parser = argparse.ArgumentParser(
        prog="decompose",
        description="Stop prompting. Start decomposing. Structured intelligence from any text.",
    )
    parser.add_argument("--text", "-t", help="Text to decompose (or pipe via stdin)")
    parser.add_argument("--compact", "-c", action="store_true", help="Compact output (omit zero-value fields)")
    parser.add_argument("--chunk-size", type=int, default=2000, help="Max characters per unit (default: 2000)")
    parser.add_argument("--pretty", "-p", action="store_true", help="Pretty-print JSON output")
    parser.add_argument("--serve", action="store_true", help="Run as MCP server (stdio)")
    parser.add_argument("--version", "-v", action="store_true", help="Print version")

    args = parser.parse_args()

    if args.version:
        from decompose import __version__
        print(f"decompose {__version__}")
        return

    if args.serve:
        from decompose.mcp_server import serve
        asyncio.run
Unit class · python · L16-L28 (13 LOC)
src/decompose/core.py
class Unit:
    """A single semantic unit — the atomic output of decompose."""

    text: str
    authority: str
    risk: str
    content_type: str
    irreducible: bool
    attention: float
    actionable: bool
    entities: list[str]
    heading: str | None = None
    heading_path: list[str] = field(default_factory=list)
DecomposeResult class · python · L32-L36 (5 LOC)
src/decompose/core.py
class DecomposeResult:
    """Complete decompose output."""

    units: list[Unit]
    meta: dict = field(default_factory=dict)
decompose_text function · python · L39-L148 (110 LOC)
src/decompose/core.py
def decompose_text(
    text: str,
    *,
    chunk_size: int = 2000,
    overlap: int = 200,
    compact: bool = False,
) -> dict:
    """Decompose text into classified semantic units.

    Args:
        text: Raw input text.
        chunk_size: Maximum characters per chunk.
        overlap: Character overlap between chunks.
        compact: If True, omit zero-value fields for smaller output.

    Returns:
        Dictionary with 'units' list and 'meta' summary.
    """
    start = time.monotonic()

    if not text or not text.strip():
        return {"units": [], "meta": {"total_units": 0, "error": "empty_input"}}

    MAX_INPUT = 10_000_000  # 10 MB
    if len(text) > MAX_INPUT:
        return {"units": [], "meta": {"total_units": 0, "error": "input_too_large", "max_bytes": MAX_INPUT}}

    chunk_size = max(100, min(chunk_size, 100_000))
    overlap = max(0, min(overlap, chunk_size // 2))

    # Chunk
    chunks = auto_chunk(text, chunk_size=chunk_size, overlap=overlap)

    # Class
filter_for_llm function · python · L155-L235 (81 LOC)
src/decompose/core.py
def filter_for_llm(
    result: dict,
    *,
    authorities: tuple[str, ...] = ("mandatory", "prohibitive", "directive", "conditional"),
    risks: tuple[str, ...] = ("safety_critical", "compliance", "financial", "contractual"),
    types: tuple[str, ...] = ("requirement", "constraint", "data", "definition"),
    min_attention: float = 0.0,
    include_headings: bool = True,
    max_tokens: int = 0,
) -> dict:
    """Filter decompose result to high-value units for LLM consumption.

    Takes the output of decompose_text() and returns only units that
    match the specified authority, risk, or content type criteria.
    Pattern proven in RBS Policy QC: Decompose as pre-filter before LLM
    extraction dramatically improves quality and reduces hallucination.

    Args:
        result: Output from decompose_text().
        authorities: Authority levels to keep.
        risks: Risk levels to keep.
        types: Content types to keep.
        min_attention: Minimum attention score (0.0 = 
Entities class · python · L41-L45 (5 LOC)
src/decompose/entities.py
class Entities:
    standards: list[str] = field(default_factory=list)
    dates: list[str] = field(default_factory=list)
    financial: list[str] = field(default_factory=list)
    references: list[str] = field(default_factory=list)
extract_entities function · python · L48-L82 (35 LOC)
src/decompose/entities.py
def extract_entities(text: str) -> Entities:
    """Extract structured entities from text. Pure regex, deterministic."""
    standards: list[str] = []
    dates: list[str] = []
    financial: list[str] = []
    references: list[str] = []

    # Standards
    for rx in (_STANDARD_US, _STANDARD_INTL, _BUILDING_CODE, _OSHA):
        for m in rx.finditer(text):
            standards.append(m.group(0).strip())

    # CFR references
    for m in _CFR.finditer(text):
        references.append(m.group(0).strip())

    # Dates
    for m in _DATE_MDY.finditer(text):
        dates.append(m.group(0))
    for m in _DATE_WRITTEN.finditer(text):
        dates.append(m.group(0))

    # Financial
    for m in _DOLLAR.finditer(text):
        financial.append(f"${m.group(1)}")
    for m in _PERCENT.finditer(text):
        financial.append(f"{m.group(1)}%")

    # Deduplicate preserving order
    return Entities(
        standards=list(dict.fromkeys(standards)),
        dates=list(dict.fromkeys(dates)),
 
IrreducibilityResult class · python · L23-L28 (6 LOC)
src/decompose/irreducibility.py
class IrreducibilityResult:
    irreducible: bool
    confidence: float
    recommendation: str
    categories: list[str] = field(default_factory=list)
    match_count: int = 0
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
detect_irreducibility function · python · L31-L57 (27 LOC)
src/decompose/irreducibility.py
def detect_irreducibility(text: str) -> IrreducibilityResult:
    """Determine if text content is computationally irreducible."""
    matches = []
    categories_seen: set[str] = set()

    for pattern, category in IRREDUCIBLE_PATTERNS:
        for m in re.finditer(pattern, text, re.IGNORECASE):
            matches.append(m.group(0)[:80])
            categories_seen.add(category)

    count = len(matches)
    confidence = min(1.0, count * 0.2)

    if confidence >= 0.6:
        rec = "PRESERVE_VERBATIM"
    elif confidence >= 0.3:
        rec = "PRESERVE_KEY_VALUES"
    else:
        rec = "SUMMARIZABLE"

    return IrreducibilityResult(
        irreducible=count > 0,
        confidence=round(confidence, 3),
        recommendation=rec,
        categories=sorted(categories_seen),
        match_count=count,
    )
_validate_url function · python · L33-L49 (17 LOC)
src/decompose/mcp_server.py
def _validate_url(url: str) -> None:
    """Reject URLs targeting internal/private networks or non-HTTP schemes."""
    parsed = urlparse(url)
    if parsed.scheme not in ("http", "https"):
        raise ValueError(f"Unsupported URL scheme: {parsed.scheme!r}")
    hostname = parsed.hostname
    if not hostname:
        raise ValueError("URL has no hostname")
    try:
        addrs = socket.getaddrinfo(hostname, None, socket.AF_UNSPEC, socket.SOCK_STREAM)
    except socket.gaierror as e:
        raise ValueError(f"Cannot resolve hostname: {e}") from None
    for _, _, _, _, sockaddr in addrs:
        ip = ipaddress.ip_address(sockaddr[0])
        for net in _BLOCKED_NETS:
            if ip in net:
                raise ValueError(f"URL resolves to blocked address: {ip}")
_HTMLToText class · python · L54-L76 (23 LOC)
src/decompose/mcp_server.py
class _HTMLToText(HTMLParser):
    """Minimal HTML-to-text converter. No dependencies."""

    def __init__(self):
        super().__init__()
        self._parts: list[str] = []
        self._skip = False

    def handle_starttag(self, tag, attrs):
        self._skip = tag in ("script", "style", "nav", "footer", "header")
        if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "h5", "h6", "li", "tr"):
            self._parts.append("\n")

    def handle_endtag(self, tag):
        if tag in ("script", "style", "nav", "footer", "header"):
            self._skip = False

    def handle_data(self, data):
        if not self._skip:
            self._parts.append(data)

    def get_text(self) -> str:
        return "".join(self._parts).strip()
__init__ method · python · L57-L60 (4 LOC)
src/decompose/mcp_server.py
    def __init__(self):
        super().__init__()
        self._parts: list[str] = []
        self._skip = False
handle_starttag method · python · L62-L65 (4 LOC)
src/decompose/mcp_server.py
    def handle_starttag(self, tag, attrs):
        self._skip = tag in ("script", "style", "nav", "footer", "header")
        if tag in ("p", "br", "div", "h1", "h2", "h3", "h4", "h5", "h6", "li", "tr"):
            self._parts.append("\n")
handle_endtag method · python · L67-L69 (3 LOC)
src/decompose/mcp_server.py
    def handle_endtag(self, tag):
        if tag in ("script", "style", "nav", "footer", "header"):
            self._skip = False
handle_data method · python · L71-L73 (3 LOC)
src/decompose/mcp_server.py
    def handle_data(self, data):
        if not self._skip:
            self._parts.append(data)
_fetch_url function · python · L79-L96 (18 LOC)
src/decompose/mcp_server.py
def _fetch_url(url: str, timeout: int = 15) -> str:
    """Fetch URL content, convert HTML to plain text. Stdlib only."""
    _validate_url(url)
    req = urllib.request.Request(
        url,
        headers={"User-Agent": "decompose/0.1", "Accept": "text/markdown, text/plain, text/html"},
    )
    with urllib.request.urlopen(req, timeout=timeout) as resp:
        content_type = resp.headers.get("Content-Type", "")
        body = resp.read().decode("utf-8", errors="replace")

        if "text/markdown" in content_type or "text/plain" in content_type:
            return body

        # HTML → plain text
        parser = _HTMLToText()
        parser.feed(body)
        return parser.get_text()
Open data scored by Repobility · https://repobility.com
list_tools function · python · L100-L137 (38 LOC)
src/decompose/mcp_server.py
async def list_tools() -> list[Tool]:
    return [
        Tool(
            name="decompose_text",
            description=(
                "Decompose text into classified semantic units with authority levels, "
                "risk scores, entity extraction, and irreducibility flags. "
                "No LLM required. Deterministic. Returns structured JSON."
            ),
            inputSchema={
                "type": "object",
                "properties": {
                    "text": {"type": "string", "description": "The text to decompose"},
                    "compact": {"type": "boolean", "description": "Omit zero-value fields", "default": False},
                    "chunk_size": {
                        "type": "integer", "description": "Max chars per unit (100-100000)",
                        "default": 2000, "minimum": 100, "maximum": 100000,
                    },
                },
                "required": ["text"],
            },
        ),
        Tool(
   
call_tool function · python · L141-L161 (21 LOC)
src/decompose/mcp_server.py
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name == "decompose_text":
        result = decompose_text(
            arguments["text"],
            compact=arguments.get("compact", False),
            chunk_size=arguments.get("chunk_size", 2000),
        )
        return [TextContent(type="text", text=json.dumps(result, indent=2))]

    elif name == "decompose_url":
        url = arguments["url"]
        try:
            text = _fetch_url(url)
        except (urllib.error.URLError, TimeoutError, OSError, ValueError) as e:
            return [TextContent(type="text", text=json.dumps({"error": f"Failed to fetch URL: {e}"}))]

        result = decompose_text(text, compact=arguments.get("compact", False))
        result["meta"]["source_url"] = url
        return [TextContent(type="text", text=json.dumps(result, indent=2))]

    return [TextContent(type="text", text=json.dumps({"error": f"Unknown tool: {name}"}))]
serve function · python · L164-L167 (4 LOC)
src/decompose/mcp_server.py
async def serve():
    """Run the MCP server on stdio."""
    async with stdio_server() as (read, write):
        await server.run(read, write, server.create_initialization_options())