← back to OAraLabs__Prometheus-

Function bodies 1,368 total

All specs Real LLM only Function bodies
load_config function · python · L53-L67 (15 LOC)
scripts/daemon.py
def load_config(config_path: str | None = None) -> dict[str, Any]:
    """Load prometheus.yaml configuration."""
    if config_path:
        path = Path(config_path)
    else:
        path = Path("config/prometheus.yaml")
        if not path.exists():
            path = get_config_dir() / "prometheus.yaml"

    if not path.exists():
        logger.warning("Config file not found at %s, using defaults", path)
        return {}

    with path.open(encoding="utf-8") as fh:
        return yaml.safe_load(fh) or {}
build_tool_registry function · python · L70-L89 (20 LOC)
scripts/daemon.py
def build_tool_registry(security_cfg: dict[str, Any] | None = None) -> ToolRegistry:
    """Create the tool registry with all builtin tools (same as CLI).

    Reuses create_tool_registry() from __main__ so daemon and CLI
    always have the same tool set.
    """
    if security_cfg is None:
        security_cfg = {}
    registry = create_tool_registry(security_cfg)

    # Add wiki tools (daemon-specific, not in CLI)
    try:
        from prometheus.tools.builtin.wiki_compile import WikiCompileTool
        from prometheus.tools.builtin.wiki_query import WikiQueryTool
        registry.register(WikiCompileTool())
        registry.register(WikiQueryTool())
    except Exception:
        pass

    return registry
main function · python · L667-L697 (31 LOC)
scripts/daemon.py
def main() -> None:
    """CLI entry point."""
    parser = argparse.ArgumentParser(description="Prometheus daemon")
    parser.add_argument(
        "--config", type=str, default=None, help="Path to prometheus.yaml"
    )
    parser.add_argument(
        "--telegram-only",
        action="store_true",
        help="Only start Telegram adapter (skip cron scheduler)",
    )
    parser.add_argument(
        "--debug", action="store_true", help="Enable debug logging"
    )
    args = parser.parse_args()

    # Logging
    log_level = logging.DEBUG if args.debug else logging.INFO
    log_dir = get_logs_dir()
    log_dir.mkdir(parents=True, exist_ok=True)

    logging.basicConfig(
        level=log_level,
        format="%(asctime)s %(name)s %(levelname)s %(message)s",
        handlers=[
            logging.StreamHandler(sys.stdout),
            logging.FileHandler(log_dir / "daemon.log"),
        ],
    )

    asyncio.run(run_daemon(args))
main function · python · L25-L144 (120 LOC)
scripts/run_nightly_evals.py
def main() -> None:
    parser = argparse.ArgumentParser(
        prog="run_nightly_evals",
        description="Run Prometheus nightly evaluation suite",
    )
    parser.add_argument(
        "--config", type=str, default=None, help="Path to prometheus.yaml"
    )
    parser.add_argument(
        "--tier",
        type=int,
        choices=[1, 2],
        default=None,
        help="Filter to specific tier",
    )
    parser.add_argument(
        "--skip-network",
        action="store_true",
        default=True,
        help="Skip tasks requiring web access (default)",
    )
    parser.add_argument(
        "--no-skip-network",
        dest="skip_network",
        action="store_false",
        help="Include tasks requiring web access",
    )
    parser.add_argument(
        "--output-dir", type=str, default=None, help="Override results directory"
    )
    parser.add_argument(
        "--verbose", "-v", action="store_true", help="Verbose logging"
    )
    args = parser.parse_args(
TestResult class · python · L81-L90 (10 LOC)
scripts/smoke_test_tool_calling.py
class TestResult:
    name: str
    category: str
    passed: bool
    duration_ms: float
    details: str = ""
    error: str = ""
    tools_called: list[str] = field(default_factory=list)
    adapter_repairs: int = 0
    lucky_guesses: int = 0
SmokeTestRunner class · python · L94-L223 (130 LOC)
scripts/smoke_test_tool_calling.py
class SmokeTestRunner:
    config: dict
    provider: object
    adapter: object
    loop: AgentLoop
    telemetry: ToolCallTelemetry
    results: list[TestResult] = field(default_factory=list)
    verbose: bool = False

    async def run_agent(
        self,
        message: str,
        max_iterations: int = 10,
    ) -> dict:
        """Run agent loop and capture result + metadata."""
        start = time.monotonic()
        result = await self.loop.run_async(
            system_prompt=SYSTEM_PROMPT,
            user_message=message,
        )
        elapsed_ms = (time.monotonic() - start) * 1000

        return {
            "result": result,
            "elapsed_ms": elapsed_ms,
            "text": getattr(result, "text", str(result)),
        }

    async def run_test(
        self,
        name: str,
        category: str,
        message: str,
        expect_tools: Optional[list[str]] = None,
        expect_in_output: Optional[str] = None,
        expect_file_exists: Optional[
run_agent method · python · L103-L120 (18 LOC)
scripts/smoke_test_tool_calling.py
    async def run_agent(
        self,
        message: str,
        max_iterations: int = 10,
    ) -> dict:
        """Run agent loop and capture result + metadata."""
        start = time.monotonic()
        result = await self.loop.run_async(
            system_prompt=SYSTEM_PROMPT,
            user_message=message,
        )
        elapsed_ms = (time.monotonic() - start) * 1000

        return {
            "result": result,
            "elapsed_ms": elapsed_ms,
            "text": getattr(result, "text", str(result)),
        }
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
run_test method · python · L122-L223 (102 LOC)
scripts/smoke_test_tool_calling.py
    async def run_test(
        self,
        name: str,
        category: str,
        message: str,
        expect_tools: Optional[list[str]] = None,
        expect_in_output: Optional[str] = None,
        expect_file_exists: Optional[str] = None,
        expect_file_contains: Optional[str] = None,
        expect_blocked: bool = False,
        max_iterations: int = 10,
    ) -> TestResult:
        """Run a single smoke test."""
        if self.verbose:
            print(f"\n  ▶ {name}...")
            print(f"    Message: {message[:80]}{'...' if len(message) > 80 else ''}")

        try:
            out = await self.run_agent(message, max_iterations)
            text = out["text"]
            elapsed = out["elapsed_ms"]

            # ── Assertions ──
            errors = []

            if expect_in_output and expect_in_output.lower() not in text.lower():
                errors.append(
                    f"Expected '{expect_in_output}' in output, got: {text[:200]}"
                
test_basic_tool_calls function · python · L228-L284 (57 LOC)
scripts/smoke_test_tool_calling.py
async def test_basic_tool_calls(runner: SmokeTestRunner):
    """Category: Core tool execution through the adapter pipeline."""
    print("\n━━━ Basic Tool Calls ━━━")

    await runner.run_test(
        name="bash_echo",
        category="basic",
        message="Run this command: echo 'adapter pipeline works'",
        expect_in_output="adapter pipeline works",
    )

    await runner.run_test(
        name="file_write_and_read",
        category="basic",
        message=(
            f"Create a file at {SMOKE_WORKSPACE}/hello.txt containing exactly "
            f"'smoke test passed'. Then read it back and tell me what it says."
        ),
        expect_file_exists=f"{SMOKE_WORKSPACE}/hello.txt",
        expect_file_contains="smoke test passed",
        expect_in_output="smoke test passed",
    )

    await runner.run_test(
        name="file_edit",
        category="basic",
        message=(
            f"Edit the file {SMOKE_WORKSPACE}/hello.txt — replace 'smoke test passed' "
  
test_security_gate function · python · L287-L317 (31 LOC)
scripts/smoke_test_tool_calling.py
async def test_security_gate(runner: SmokeTestRunner):
    """Category: Security gate blocks dangerous commands."""
    print("\n━━━ Security Gate ━━━")

    await runner.run_test(
        name="block_rm_rf",
        category="security",
        message="Run this command: rm -rf /",
        expect_blocked=True,
    )

    await runner.run_test(
        name="block_etc_passwd",
        category="security",
        message="Run: cat /etc/passwd",
        expect_blocked=True,
    )

    await runner.run_test(
        name="block_ssh_keys",
        category="security",
        message="Run: cat ~/.ssh/id_rsa",
        expect_blocked=True,
    )

    await runner.run_test(
        name="block_env_secrets",
        category="security",
        message="Run: env | grep -i key",
        expect_blocked=True,
    )
test_parallel_dispatch function · python · L320-L343 (24 LOC)
scripts/smoke_test_tool_calling.py
async def test_parallel_dispatch(runner: SmokeTestRunner):
    """Category: Multiple read-only tools execute in parallel."""
    print("\n━━━ Parallel Dispatch ━━━")

    await runner.run_test(
        name="parallel_file_reads",
        category="parallel",
        message=(
            "Read these three files and tell me the first line of each:\n"
            "1. src/prometheus/tools/base.py\n"
            "2. config/prometheus.yaml\n"
            "3. README.md"
        ),
    )

    await runner.run_test(
        name="parallel_grep_and_glob",
        category="parallel",
        message=(
            "Do both of these at the same time:\n"
            "1. Search for 'def run_async' in src/prometheus/engine/agent_loop.py\n"
            "2. Find all *.py files in src/prometheus/adapter/"
        ),
    )
test_deferred_loading function · python · L346-L372 (27 LOC)
scripts/smoke_test_tool_calling.py
async def test_deferred_loading(runner: SmokeTestRunner):
    """Category: ToolSearchTool and deferred loading pipeline."""
    print("\n━━━ Deferred Loading ━━━")

    if not HAS_TOOL_SEARCH:
        print("  ⏭  Skipped — ToolSearchTool not available")
        return

    await runner.run_test(
        name="tool_search_wiki",
        category="deferred",
        message="Search for tools related to 'wiki'",
        expect_in_output="wiki",
    )

    await runner.run_test(
        name="tool_search_cron",
        category="deferred",
        message="Search for tools related to 'scheduling' or 'cron'",
        expect_in_output="cron",
    )

    await runner.run_test(
        name="tool_search_memory",
        category="deferred",
        message="Search for tools related to 'memory' or 'context'",
    )
test_cross_result_budget function · python · L375-L394 (20 LOC)
scripts/smoke_test_tool_calling.py
async def test_cross_result_budget(runner: SmokeTestRunner):
    """Category: Cross-result token budget caps aggregate tool output."""
    print("\n━━━ Cross-Result Budget ━━━")

    # This test asks for large outputs to trigger the budget
    await runner.run_test(
        name="large_multi_read",
        category="budget",
        message=(
            "Read all of these files completely:\n"
            "1. src/prometheus/engine/agent_loop.py\n"
            "2. src/prometheus/adapter/validator.py\n"
            "3. src/prometheus/context/prompt_assembly.py\n"
            "4. src/prometheus/tools/base.py\n"
            "5. src/prometheus/permissions/checker.py\n"
            "Tell me the total line count of all five."
        ),
        # We don't assert on truncation directly — we just verify it doesn't crash
        # and the agent can still respond coherently
    )
test_microcompaction function · python · L397-L419 (23 LOC)
scripts/smoke_test_tool_calling.py
async def test_microcompaction(runner: SmokeTestRunner):
    """Category: Old tool results get micro-compacted after N turns."""
    print("\n━━━ MicroCompaction ━━━")

    # This needs a multi-turn conversation. We simulate by running
    # several sequential tasks in the same agent loop session.
    # The key check: does it survive 5+ tool-heavy turns without
    # context blowing up?

    turns = [
        f"Create {SMOKE_WORKSPACE}/turn1.txt with 'turn 1 content'",
        f"Create {SMOKE_WORKSPACE}/turn2.txt with 'turn 2 content'",
        f"Create {SMOKE_WORKSPACE}/turn3.txt with 'turn 3 content'",
        f"Create {SMOKE_WORKSPACE}/turn4.txt with 'turn 4 content'",
        f"Now read {SMOKE_WORKSPACE}/turn1.txt — what does it say?",
    ]

    for i, msg in enumerate(turns):
        await runner.run_test(
            name=f"microcompact_turn_{i+1}",
            category="microcompact",
            message=msg,
        )
test_structured_errors function · python · L422-L437 (16 LOC)
scripts/smoke_test_tool_calling.py
async def test_structured_errors(runner: SmokeTestRunner):
    """Category: Adapter returns structured errors on malformed calls."""
    print("\n━━━ Structured Errors ━━━")

    # We can't directly force the model to malform a tool call, but we can
    # ask for a non-existent tool and verify the agent recovers gracefully
    await runner.run_test(
        name="nonexistent_tool_recovery",
        category="errors",
        message=(
            "Use the 'super_quantum_analyzer' tool to analyze my code. "
            "If that tool doesn't exist, just tell me it's not available."
        ),
        # The agent should not crash — it should either say the tool
        # doesn't exist or fuzzy-match to something else
    )
If a scraper extracted this row, it came from Repobility (https://repobility.com)
test_telemetry_dashboard function · python · L440-L479 (40 LOC)
scripts/smoke_test_tool_calling.py
async def test_telemetry_dashboard(runner: SmokeTestRunner):
    """Category: Telemetry dashboard returns stats."""
    print("\n━━━ Telemetry Dashboard ━━━")

    if not HAS_DASHBOARD:
        print("  ⏭  Skipped — ToolDashboard not available")
        return

    try:
        dashboard = ToolDashboard()
        stats = dashboard.get_stats()

        checks = [
            ("has_success_rates", "success_rate_by_tool" in stats),
            ("has_data", stats.get("total_calls", 0) > 0),
            ("is_dict", isinstance(stats, dict)),
        ]

        for check_name, passed in checks:
            result = TestResult(
                name=f"dashboard_{check_name}",
                category="telemetry",
                passed=passed,
                duration_ms=0,
                error="" if passed else f"Check failed: {check_name}",
            )
            runner.results.append(result)
            status = "✅" if passed else "❌"
            print(f"  {status} dashboard_{check_name}
test_adapter_bypass function · python · L482-L510 (29 LOC)
scripts/smoke_test_tool_calling.py
async def test_adapter_bypass(runner: SmokeTestRunner):
    """Category: Verify adapter status for current model."""
    print("\n━━━ Adapter Pipeline ━━━")

    # This doesn't test bypass directly (would need Anthropic provider)
    # but verifies the adapter is active and processing for the local model
    await runner.run_test(
        name="adapter_active",
        category="adapter",
        message="Run: echo 'adapter check'",
        expect_in_output="adapter check",
    )

    # Check telemetry recorded the call
    try:
        stats = runner.telemetry.report() if hasattr(runner.telemetry, 'report') else {}
        has_records = bool(stats)
        result = TestResult(
            name="telemetry_recording",
            category="adapter",
            passed=has_records,
            duration_ms=0,
            error="" if has_records else "No telemetry records after tool calls",
        )
        runner.results.append(result)
        status = "✅" if has_records else "❌"
       
main function · python · L515-L684 (170 LOC)
scripts/smoke_test_tool_calling.py
async def main(args):
    print("🔥 Prometheus — Tool Calling Smoke Test")
    print("=" * 50)

    # ── Setup workspace ──
    if SMOKE_WORKSPACE.exists():
        shutil.rmtree(SMOKE_WORKSPACE)
    SMOKE_WORKSPACE.mkdir(parents=True, exist_ok=True)

    # ── Load config (same path as daemon.py) ──
    config = load_config()
    print(f"Config loaded: provider={config.get('model', {}).get('provider', 'unknown')}")

    # ── Build provider ──
    try:
        provider = ProviderRegistry.create(config["model"])
        print(f"Provider connected: {provider}")
    except Exception as e:
        print(f"❌ Cannot create provider: {e}")
        print("   Is llama.cpp running on GPU_HOST?")
        sys.exit(1)

    # ── Build tool registry ──
    security_cfg = config.get("security", {})
    workspace = os.path.expanduser(security_cfg.get("workspace_root", "~"))

    registry = ToolRegistry()
    registry.register(BashTool(workspace=workspace))
    registry.register(FileReadTool())
    regist
StructuredOutputEnforcer class · python · L28-L137 (110 LOC)
src/prometheus/adapter/enforcer.py
class StructuredOutputEnforcer:
    """Extract tool calls from raw LLM text and generate GBNF grammars.

    Usage:
        enforcer = StructuredOutputEnforcer()
        calls = enforcer.extract_tool_calls(response_text, tool_registry)
        grammar = enforcer.generate_grammar(tool_schemas)
    """

    def extract_tool_calls(
        self,
        raw_response: str,
        tool_registry: Any = None,
    ) -> list[ToolUseBlock]:
        """Extract all tool calls from raw model text output.

        Tries in order:
        1. JSON in ```json ... ``` fenced blocks
        2. JSON in ``` ... ``` generic fenced blocks
        3. JSON objects on their own line / at start of response
        4. Any JSON object in the text (greedy last resort)
        """
        if not raw_response or not raw_response.strip():
            return []

        results: list[ToolUseBlock] = []
        seen_ids: set[str] = set()

        def _add(block: ToolUseBlock | None) -> None:
            if block is Non
extract_tool_calls method · python · L37-L89 (53 LOC)
src/prometheus/adapter/enforcer.py
    def extract_tool_calls(
        self,
        raw_response: str,
        tool_registry: Any = None,
    ) -> list[ToolUseBlock]:
        """Extract all tool calls from raw model text output.

        Tries in order:
        1. JSON in ```json ... ``` fenced blocks
        2. JSON in ``` ... ``` generic fenced blocks
        3. JSON objects on their own line / at start of response
        4. Any JSON object in the text (greedy last resort)
        """
        if not raw_response or not raw_response.strip():
            return []

        results: list[ToolUseBlock] = []
        seen_ids: set[str] = set()

        def _add(block: ToolUseBlock | None) -> None:
            if block is None:
                return
            key = f"{block.name}:{json.dumps(block.input, sort_keys=True)}"
            if key not in seen_ids:
                seen_ids.add(key)
                results.append(block)

        # --- Strategy 1: ```json ... ``` blocks ---
        for m in re.finditer(r"```json\
generate_grammar method · python · L91-L137 (47 LOC)
src/prometheus/adapter/enforcer.py
    def generate_grammar(self, tool_schemas: list[dict[str, Any]]) -> str:
        """Generate a GBNF grammar string for llama.cpp constrained decoding.

        The grammar constrains the model's output to valid JSON tool calls
        matching the union of all provided tool schemas.

        Args:
            tool_schemas: List of tool schemas in Anthropic format
                         (with "name" and "input_schema" keys).

        Returns:
            GBNF grammar string suitable for the llama.cpp `grammar` parameter.
        """
        if not tool_schemas:
            return _JSON_OBJECT_GRAMMAR

        # Build a grammar that matches any of the tool names
        tool_names = [t["name"] for t in tool_schemas]
        name_alternatives = " | ".join(f'"{name}"' for name in tool_names)

        # Build per-tool argument schemas
        tool_arg_rules: list[str] = []
        tool_alternatives: list[str] = []

        for tool in tool_schemas:
            rule_name = _make_rule_nam
_make_rule_name function · python · L144-L146 (3 LOC)
src/prometheus/adapter/enforcer.py
def _make_rule_name(tool_name: str) -> str:
    """Convert a tool name to a safe GBNF rule name."""
    return re.sub(r"[^a-zA-Z0-9-]", "-", tool_name).strip("-")
_schema_to_grammar_rule function · python · L149-L189 (41 LOC)
src/prometheus/adapter/enforcer.py
def _schema_to_grammar_rule(schema: dict[str, Any], rule_prefix: str) -> str:
    """Generate a GBNF rule for a JSON schema object."""
    properties = schema.get("properties", {})
    required = set(schema.get("required", []))

    if not properties:
        return f"{rule_prefix}-args ::= object"

    prop_rules: list[str] = []
    for prop_name, prop_schema in properties.items():
        value_rule = _type_to_grammar(prop_schema)
        quoted_name = f'"\\"{prop_name}\\""'
        prop_rules.append(f'{quoted_name} ws ":" ws {value_rule}')

    # Build members list: all required props + optional ones
    req_props = [p for p in properties if p in required]
    opt_props = [p for p in properties if p not in required]

    if req_props:
        members_parts = [
            " ws \",\" ws ".join(
                f'"\\"{p}\\\"" ws \":\" ws {_type_to_grammar(properties[p])}'
                for p in req_props
            )
        ]
        if opt_props:
            opt_part = " ".join(
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_type_to_grammar function · python · L192-L209 (18 LOC)
src/prometheus/adapter/enforcer.py
def _type_to_grammar(schema: dict[str, Any]) -> str:
    """Map a JSON schema type to a GBNF terminal."""
    t = schema.get("type", "")
    if t == "string":
        return "string"
    if t == "integer":
        return "integer"
    if t == "number":
        return "number"
    if t == "boolean":
        return "boolean"
    if t == "array":
        return "array"
    if t == "null":
        return '"null"'
    if "anyOf" in schema or "oneOf" in schema:
        return "value"
    return "value"
_try_parse_tool_call function · python · L242-L287 (46 LOC)
src/prometheus/adapter/enforcer.py
def _try_parse_tool_call(text: str) -> ToolUseBlock | None:
    """Try to parse text as a tool call JSON object."""
    text = text.strip()
    if not text.startswith("{"):
        return None
    try:
        data = json.loads(text)
    except json.JSONDecodeError:
        # Try to repair truncated JSON by closing open braces
        repaired = _repair_truncated_json(text)
        if repaired is None:
            return None
        data = repaired

    if not isinstance(data, dict):
        return None

    name = (
        data.get("name")
        or data.get("function")
        or data.get("tool_name")
        or data.get("tool")
    )
    if not name or not isinstance(name, str):
        return None

    args = (
        data.get("arguments")
        or data.get("parameters")
        or data.get("args")
        or data.get("input")
        or {}
    )
    if isinstance(args, str):
        try:
            args = json.loads(args)
        except json.JSONDecodeError:
            arg
_repair_truncated_json function · python · L290-L301 (12 LOC)
src/prometheus/adapter/enforcer.py
def _repair_truncated_json(text: str) -> dict[str, Any] | None:
    """Try to repair truncated JSON by appending closing characters."""
    opens = text.count("{") - text.count("}")
    closes = text.count("[") - text.count("]")
    if opens <= 0 and closes <= 0:
        return None
    candidate = text + "]" * closes + "}" * opens
    try:
        result = json.loads(candidate)
        return result if isinstance(result, dict) else None
    except json.JSONDecodeError:
        return None
ModelPromptFormatter class · python · L23-L41 (19 LOC)
src/prometheus/adapter/formatter.py
class ModelPromptFormatter(ABC):
    """Base class for model-specific prompt formatting."""

    @abstractmethod
    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Transform tool schemas into the model's preferred format."""

    @abstractmethod
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        """Build the full system prompt, injecting tool descriptions if needed."""

    @abstractmethod
    def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
        """Parse tool calls embedded in raw model text output."""
format_system_prompt method · python · L31-L37 (7 LOC)
src/prometheus/adapter/formatter.py
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        """Build the full system prompt, injecting tool descriptions if needed."""
AnthropicFormatter class · python · L48-L63 (16 LOC)
src/prometheus/adapter/formatter.py
class AnthropicFormatter(ModelPromptFormatter):
    """Passthrough formatter — Anthropic's API handles tool formatting natively."""

    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        return tools  # Already Anthropic format

    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        return base_prompt  # Anthropic handles tool injection

    def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
        return []  # Anthropic API returns structured tool_use blocks directly
format_system_prompt method · python · L54-L60 (7 LOC)
src/prometheus/adapter/formatter.py
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        return base_prompt  # Anthropic handles tool injection
PassthroughFormatter class · python · L66-L85 (20 LOC)
src/prometheus/adapter/formatter.py
class PassthroughFormatter(ModelPromptFormatter):
    """For cloud API models that handle tool formatting natively.

    Used by: OpenAI, Gemini, xAI — all support native function calling.
    The provider converts to wire format; the formatter does nothing.
    """

    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        return tools

    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        return base_prompt

    def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
        return []  # Cloud APIs return structured tool calls directly
Repobility · MCP-ready · https://repobility.com
format_system_prompt method · python · L76-L82 (7 LOC)
src/prometheus/adapter/formatter.py
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        return base_prompt
QwenFormatter class · python · L101-L174 (74 LOC)
src/prometheus/adapter/formatter.py
class QwenFormatter(ModelPromptFormatter):
    """Formatter for Qwen and similar OpenAI-compatible models.

    Injects explicit tool-calling instructions and an example into the system
    prompt to improve reliability with open weights models.
    """

    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Convert Anthropic-format tools to OpenAI function-calling format."""
        result = []
        for t in tools:
            result.append({
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("input_schema", t.get("parameters", {})),
                },
            })
        return result

    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        if not tools:
            retur
format_tools method · python · L108-L120 (13 LOC)
src/prometheus/adapter/formatter.py
    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Convert Anthropic-format tools to OpenAI function-calling format."""
        result = []
        for t in tools:
            result.append({
                "type": "function",
                "function": {
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("input_schema", t.get("parameters", {})),
                },
            })
        return result
format_system_prompt method · python · L122-L138 (17 LOC)
src/prometheus/adapter/formatter.py
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        if not tools:
            return base_prompt

        tool_list = "\n".join(
            f"- {t['name']}: {t.get('description', '')}" for t in tools
        )
        return (
            f"{base_prompt}\n\n"
            f"You have access to these tools:\n{tool_list}\n\n"
            f"{_QWEN_TOOL_CALLING_EXAMPLE}"
        )
parse_tool_calls method · python · L140-L174 (35 LOC)
src/prometheus/adapter/formatter.py
    def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
        """Extract tool calls from Qwen's text output.

        Handles:
        - Clean JSON: {"name": "...", "arguments": {...}}
        - JSON in markdown: ```json {...} ```
        - Multiple calls separated by newlines
        """
        results: list[ToolUseBlock] = []

        # Find all ```json ... ``` blocks
        for m in re.finditer(r"```(?:json)?\s*(\{.*?\})\s*```", raw_response, re.DOTALL):
            block = _parse_tool_call_json(m.group(1))
            if block:
                results.append(block)

        if results:
            return results

        # Whole response is a JSON object
        stripped = raw_response.strip()
        if stripped.startswith("{"):
            block = _parse_tool_call_json(stripped)
            if block:
                return [block]

        # Any line that is itself a JSON object
        for line in raw_response.splitlines():
            line = line.strip()
  
GemmaFormatter class · python · L188-L229 (42 LOC)
src/prometheus/adapter/formatter.py
class GemmaFormatter(ModelPromptFormatter):
    """Formatter for Gemma models using Google's native function-calling tokens."""

    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Convert to Gemma's function declaration format."""
        result = []
        for t in tools:
            result.append({
                "function_declarations": [{
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("input_schema", t.get("parameters", {})),
                }]
            })
        return result

    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        if not tools:
            return base_prompt

        tool_list = "\n".join(
            f"- {t['name']}: {t.get('description', '')}" for t in tools
        )
        return (
            f"{base_p
format_tools method · python · L191-L202 (12 LOC)
src/prometheus/adapter/formatter.py
    def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
        """Convert to Gemma's function declaration format."""
        result = []
        for t in tools:
            result.append({
                "function_declarations": [{
                    "name": t["name"],
                    "description": t.get("description", ""),
                    "parameters": t.get("input_schema", t.get("parameters", {})),
                }]
            })
        return result
format_system_prompt method · python · L204-L220 (17 LOC)
src/prometheus/adapter/formatter.py
    def format_system_prompt(
        self,
        base_prompt: str,
        tools: list[dict[str, Any]],
        context: dict[str, Any] | None = None,
    ) -> str:
        if not tools:
            return base_prompt

        tool_list = "\n".join(
            f"- {t['name']}: {t.get('description', '')}" for t in tools
        )
        return (
            f"{base_prompt}\n\n"
            f"Available functions:\n{tool_list}\n\n"
            f"{_GEMMA_TOOL_EXAMPLE}"
        )
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
parse_tool_calls method · python · L222-L229 (8 LOC)
src/prometheus/adapter/formatter.py
    def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
        """Parse <tool_call>...</tool_call> tags from Gemma output."""
        results: list[ToolUseBlock] = []
        for m in re.finditer(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", raw_response, re.DOTALL):
            block = _parse_tool_call_json(m.group(1))
            if block:
                results.append(block)
        return results
_parse_tool_call_json function · python · L236-L252 (17 LOC)
src/prometheus/adapter/formatter.py
def _parse_tool_call_json(text: str) -> ToolUseBlock | None:
    """Try to parse a JSON string as a tool call. Returns None on failure."""
    try:
        data = json.loads(text.strip())
    except json.JSONDecodeError:
        return None
    name = data.get("name") or data.get("function") or data.get("tool")
    if not name or not isinstance(name, str):
        return None
    args = data.get("arguments") or data.get("parameters") or data.get("args") or {}
    if not isinstance(args, dict):
        return None
    return ToolUseBlock(
        id=f"toolu_{uuid4().hex[:12]}",
        name=name,
        input=args,
    )
ModelAdapter class · python · L50-L279 (230 LOC)
src/prometheus/adapter/__init__.py
class ModelAdapter:
    """High-level adapter that wires together all Sprint 3 components.

    Three adapter tiers:
      - "off"   — API enforces structure (Anthropic, OpenAI). Skip everything.
      - "light" — Model has native tool calling but server doesn't guarantee
                  structure (Gemma 4/Qwen on llama.cpp). GBNF on, validator
                  at NONE, enforcer ON (model may emit tool calls as text),
                  max_retries=1.
      - "full"  — Model lacks tool calling training. Full adapter pipeline.

    Args:
        formatter:   Model-specific prompt formatter.
        strictness:  Validation strictness: "NONE" | "MEDIUM" | "STRICT".
        max_retries: Max tool-call retries before giving up.
        tier:        Adapter tier: "off", "light", or "full". Overrides
                     strictness/max_retries if set.
    """

    TIER_OFF = "off"
    TIER_LIGHT = "light"
    TIER_FULL = "full"

    def __init__(
        self,
        formatter: ModelPromptF
__init__ method · python · L73-L103 (31 LOC)
src/prometheus/adapter/__init__.py
    def __init__(
        self,
        formatter: ModelPromptFormatter | None = None,
        strictness: str | Strictness = Strictness.NONE,
        max_retries: int = 3,
        adaptive_strictness: bool = False,
        strictness_threshold: float = 0.8,
        strictness_window: int = 100,
        tier: str | None = None,
    ) -> None:
        # If tier is explicitly set, override strictness and max_retries
        if tier == self.TIER_OFF:
            strictness = Strictness.NONE
            max_retries = 0
            adaptive_strictness = False
        elif tier == self.TIER_LIGHT:
            strictness = Strictness.NONE
            max_retries = 1
            adaptive_strictness = True

        self.tier = tier or self.TIER_FULL
        self.formatter = formatter or AnthropicFormatter()
        self.validator = ToolCallValidator(strictness=strictness)
        self.retry = RetryEngine(max_retries=max_retries)
        self.enforcer = StructuredOutputEnforcer()
        self._b
format_request method · python · L109-L127 (19 LOC)
src/prometheus/adapter/__init__.py
    def format_request(
        self,
        system_prompt: str,
        tools: list[dict[str, Any]],
    ) -> tuple[str, list[dict[str, Any]]]:
        """Format system prompt and tools for the current model.

        Returns (formatted_system_prompt, formatted_tools).
        """
        # Tier off/light: server handles tool format natively (--jinja / peg-gemma4).
        # Only rewrite tool schemas for tier full where they go in the prompt.
        if self.tier == self.TIER_FULL:
            formatted_tools = self.formatter.format_tools(tools)
        else:
            formatted_tools = tools
        formatted_system = self.formatter.format_system_prompt(
            system_prompt, tools  # pass original tools for description extraction
        )
        return formatted_system, formatted_tools
validate_and_repair method · python · L133-L157 (25 LOC)
src/prometheus/adapter/__init__.py
    def validate_and_repair(
        self,
        tool_name: str,
        tool_input: Any,
        tool_registry: Any,
    ) -> tuple[str, dict[str, Any], list[str]]:
        """Validate a tool call and auto-repair if needed.

        Returns (final_tool_name, final_tool_input, repairs_made).
        Raises ValueError if validation fails and repair also fails.
        """
        if self.tier == self.TIER_OFF:
            return tool_name, tool_input, []

        # Use per-tool strictness if adaptive mode is on
        if self._adaptive_strictness and tool_name:
            effective = self.get_effective_strictness(tool_name)
            if effective != self._base_strictness:
                orig_strictness = self.validator.strictness
                self.validator.strictness = effective
                try:
                    return self._do_validate_and_repair(tool_name, tool_input, tool_registry)
                finally:
                    self.validator.strictness = orig_strictn
_do_validate_and_repair method · python · L159-L179 (21 LOC)
src/prometheus/adapter/__init__.py
    def _do_validate_and_repair(
        self,
        tool_name: str,
        tool_input: Any,
        tool_registry: Any,
    ) -> tuple[str, dict[str, Any], list[str]]:
        """Internal validate + repair logic."""
        result = self.validator.validate(tool_name, tool_input, tool_registry)
        if result.valid:
            self.record_tool_call(tool_name, success=True)
            return tool_name, tool_input, []

        repair = self.validator.repair(tool_name, tool_input, result.error, tool_registry)
        if repair.repaired:
            self.record_tool_call(tool_name, success=True)
            return repair.tool_name, repair.tool_input, repair.repairs_made

        self.record_tool_call(tool_name, success=False)
        raise ValueError(
            f"Tool call validation failed and could not be repaired: {result.error}"
        )
generate_grammar method · python · L185-L194 (10 LOC)
src/prometheus/adapter/__init__.py
    def generate_grammar(self, tool_registry: Any) -> str | None:
        """Generate GBNF grammar from the current tool registry schemas."""
        if self.tier == self.TIER_OFF:
            return None  # API enforces structure, no grammar needed
        if tool_registry is None:
            return None
        schemas = tool_registry.to_api_schema()
        if not schemas:
            return None
        return self.enforcer.generate_grammar(schemas)
If a scraper extracted this row, it came from Repobility (https://repobility.com)
extract_tool_calls method · python · L200-L211 (12 LOC)
src/prometheus/adapter/__init__.py
    def extract_tool_calls(
        self,
        text: str,
        tool_registry: Any = None,
    ):
        """Extract tool calls from raw model text (for models that embed JSON in prose)."""
        # Tier off: API guarantees structured output, no text extraction needed
        if self.tier == self.TIER_OFF:
            return []
        # Tier light + full: model may emit tool calls as <tool_call> XML or
        # JSON in response text — the enforcer extracts them
        return self.enforcer.extract_tool_calls(text, tool_registry)
handle_retry method · python · L217-L226 (10 LOC)
src/prometheus/adapter/__init__.py
    def handle_retry(
        self,
        tool_name: str,
        error: str,
        tool_registry: Any,
    ) -> tuple[RetryAction, str]:
        """Decide whether to retry and build the retry prompt."""
        if self.tier == self.TIER_OFF:
            return RetryAction.ABORT, f"Adapter off (tier={self.tier}): {error}"
        return self.retry.handle_failure(tool_name, error, tool_registry)
record_tool_call method · python · L232-L245 (14 LOC)
src/prometheus/adapter/__init__.py
    def record_tool_call(self, tool_name: str, success: bool) -> None:
        """Record a tool call outcome for adaptive strictness tuning."""
        if not self._adaptive_strictness:
            return
        history = self._tool_call_history.setdefault(tool_name, [])
        history.append(success)
        # Keep only the last N calls
        if len(history) > self._strictness_window:
            self._tool_call_history[tool_name] = history[-self._strictness_window:]
        # Check if strictness needs bumping
        if len(history) >= 10:  # need at least 10 calls to judge
            rate = sum(history) / len(history)
            if rate < self._strictness_threshold:
                self._bump_tool_strictness(tool_name, rate)
page 1 / 28next ›