Function bodies 1,368 total
load_config function · python · L53-L67 (15 LOC)scripts/daemon.py
def load_config(config_path: str | None = None) -> dict[str, Any]:
"""Load prometheus.yaml configuration."""
if config_path:
path = Path(config_path)
else:
path = Path("config/prometheus.yaml")
if not path.exists():
path = get_config_dir() / "prometheus.yaml"
if not path.exists():
logger.warning("Config file not found at %s, using defaults", path)
return {}
with path.open(encoding="utf-8") as fh:
return yaml.safe_load(fh) or {}build_tool_registry function · python · L70-L89 (20 LOC)scripts/daemon.py
def build_tool_registry(security_cfg: dict[str, Any] | None = None) -> ToolRegistry:
"""Create the tool registry with all builtin tools (same as CLI).
Reuses create_tool_registry() from __main__ so daemon and CLI
always have the same tool set.
"""
if security_cfg is None:
security_cfg = {}
registry = create_tool_registry(security_cfg)
# Add wiki tools (daemon-specific, not in CLI)
try:
from prometheus.tools.builtin.wiki_compile import WikiCompileTool
from prometheus.tools.builtin.wiki_query import WikiQueryTool
registry.register(WikiCompileTool())
registry.register(WikiQueryTool())
except Exception:
pass
return registrymain function · python · L667-L697 (31 LOC)scripts/daemon.py
def main() -> None:
"""CLI entry point."""
parser = argparse.ArgumentParser(description="Prometheus daemon")
parser.add_argument(
"--config", type=str, default=None, help="Path to prometheus.yaml"
)
parser.add_argument(
"--telegram-only",
action="store_true",
help="Only start Telegram adapter (skip cron scheduler)",
)
parser.add_argument(
"--debug", action="store_true", help="Enable debug logging"
)
args = parser.parse_args()
# Logging
log_level = logging.DEBUG if args.debug else logging.INFO
log_dir = get_logs_dir()
log_dir.mkdir(parents=True, exist_ok=True)
logging.basicConfig(
level=log_level,
format="%(asctime)s %(name)s %(levelname)s %(message)s",
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler(log_dir / "daemon.log"),
],
)
asyncio.run(run_daemon(args))main function · python · L25-L144 (120 LOC)scripts/run_nightly_evals.py
def main() -> None:
parser = argparse.ArgumentParser(
prog="run_nightly_evals",
description="Run Prometheus nightly evaluation suite",
)
parser.add_argument(
"--config", type=str, default=None, help="Path to prometheus.yaml"
)
parser.add_argument(
"--tier",
type=int,
choices=[1, 2],
default=None,
help="Filter to specific tier",
)
parser.add_argument(
"--skip-network",
action="store_true",
default=True,
help="Skip tasks requiring web access (default)",
)
parser.add_argument(
"--no-skip-network",
dest="skip_network",
action="store_false",
help="Include tasks requiring web access",
)
parser.add_argument(
"--output-dir", type=str, default=None, help="Override results directory"
)
parser.add_argument(
"--verbose", "-v", action="store_true", help="Verbose logging"
)
args = parser.parse_args(TestResult class · python · L81-L90 (10 LOC)scripts/smoke_test_tool_calling.py
class TestResult:
name: str
category: str
passed: bool
duration_ms: float
details: str = ""
error: str = ""
tools_called: list[str] = field(default_factory=list)
adapter_repairs: int = 0
lucky_guesses: int = 0SmokeTestRunner class · python · L94-L223 (130 LOC)scripts/smoke_test_tool_calling.py
class SmokeTestRunner:
config: dict
provider: object
adapter: object
loop: AgentLoop
telemetry: ToolCallTelemetry
results: list[TestResult] = field(default_factory=list)
verbose: bool = False
async def run_agent(
self,
message: str,
max_iterations: int = 10,
) -> dict:
"""Run agent loop and capture result + metadata."""
start = time.monotonic()
result = await self.loop.run_async(
system_prompt=SYSTEM_PROMPT,
user_message=message,
)
elapsed_ms = (time.monotonic() - start) * 1000
return {
"result": result,
"elapsed_ms": elapsed_ms,
"text": getattr(result, "text", str(result)),
}
async def run_test(
self,
name: str,
category: str,
message: str,
expect_tools: Optional[list[str]] = None,
expect_in_output: Optional[str] = None,
expect_file_exists: Optional[run_agent method · python · L103-L120 (18 LOC)scripts/smoke_test_tool_calling.py
async def run_agent(
self,
message: str,
max_iterations: int = 10,
) -> dict:
"""Run agent loop and capture result + metadata."""
start = time.monotonic()
result = await self.loop.run_async(
system_prompt=SYSTEM_PROMPT,
user_message=message,
)
elapsed_ms = (time.monotonic() - start) * 1000
return {
"result": result,
"elapsed_ms": elapsed_ms,
"text": getattr(result, "text", str(result)),
}Repobility — the code-quality scanner for AI-generated software · https://repobility.com
run_test method · python · L122-L223 (102 LOC)scripts/smoke_test_tool_calling.py
async def run_test(
self,
name: str,
category: str,
message: str,
expect_tools: Optional[list[str]] = None,
expect_in_output: Optional[str] = None,
expect_file_exists: Optional[str] = None,
expect_file_contains: Optional[str] = None,
expect_blocked: bool = False,
max_iterations: int = 10,
) -> TestResult:
"""Run a single smoke test."""
if self.verbose:
print(f"\n ▶ {name}...")
print(f" Message: {message[:80]}{'...' if len(message) > 80 else ''}")
try:
out = await self.run_agent(message, max_iterations)
text = out["text"]
elapsed = out["elapsed_ms"]
# ── Assertions ──
errors = []
if expect_in_output and expect_in_output.lower() not in text.lower():
errors.append(
f"Expected '{expect_in_output}' in output, got: {text[:200]}"
test_basic_tool_calls function · python · L228-L284 (57 LOC)scripts/smoke_test_tool_calling.py
async def test_basic_tool_calls(runner: SmokeTestRunner):
"""Category: Core tool execution through the adapter pipeline."""
print("\n━━━ Basic Tool Calls ━━━")
await runner.run_test(
name="bash_echo",
category="basic",
message="Run this command: echo 'adapter pipeline works'",
expect_in_output="adapter pipeline works",
)
await runner.run_test(
name="file_write_and_read",
category="basic",
message=(
f"Create a file at {SMOKE_WORKSPACE}/hello.txt containing exactly "
f"'smoke test passed'. Then read it back and tell me what it says."
),
expect_file_exists=f"{SMOKE_WORKSPACE}/hello.txt",
expect_file_contains="smoke test passed",
expect_in_output="smoke test passed",
)
await runner.run_test(
name="file_edit",
category="basic",
message=(
f"Edit the file {SMOKE_WORKSPACE}/hello.txt — replace 'smoke test passed' "
test_security_gate function · python · L287-L317 (31 LOC)scripts/smoke_test_tool_calling.py
async def test_security_gate(runner: SmokeTestRunner):
"""Category: Security gate blocks dangerous commands."""
print("\n━━━ Security Gate ━━━")
await runner.run_test(
name="block_rm_rf",
category="security",
message="Run this command: rm -rf /",
expect_blocked=True,
)
await runner.run_test(
name="block_etc_passwd",
category="security",
message="Run: cat /etc/passwd",
expect_blocked=True,
)
await runner.run_test(
name="block_ssh_keys",
category="security",
message="Run: cat ~/.ssh/id_rsa",
expect_blocked=True,
)
await runner.run_test(
name="block_env_secrets",
category="security",
message="Run: env | grep -i key",
expect_blocked=True,
)test_parallel_dispatch function · python · L320-L343 (24 LOC)scripts/smoke_test_tool_calling.py
async def test_parallel_dispatch(runner: SmokeTestRunner):
"""Category: Multiple read-only tools execute in parallel."""
print("\n━━━ Parallel Dispatch ━━━")
await runner.run_test(
name="parallel_file_reads",
category="parallel",
message=(
"Read these three files and tell me the first line of each:\n"
"1. src/prometheus/tools/base.py\n"
"2. config/prometheus.yaml\n"
"3. README.md"
),
)
await runner.run_test(
name="parallel_grep_and_glob",
category="parallel",
message=(
"Do both of these at the same time:\n"
"1. Search for 'def run_async' in src/prometheus/engine/agent_loop.py\n"
"2. Find all *.py files in src/prometheus/adapter/"
),
)test_deferred_loading function · python · L346-L372 (27 LOC)scripts/smoke_test_tool_calling.py
async def test_deferred_loading(runner: SmokeTestRunner):
"""Category: ToolSearchTool and deferred loading pipeline."""
print("\n━━━ Deferred Loading ━━━")
if not HAS_TOOL_SEARCH:
print(" ⏭ Skipped — ToolSearchTool not available")
return
await runner.run_test(
name="tool_search_wiki",
category="deferred",
message="Search for tools related to 'wiki'",
expect_in_output="wiki",
)
await runner.run_test(
name="tool_search_cron",
category="deferred",
message="Search for tools related to 'scheduling' or 'cron'",
expect_in_output="cron",
)
await runner.run_test(
name="tool_search_memory",
category="deferred",
message="Search for tools related to 'memory' or 'context'",
)test_cross_result_budget function · python · L375-L394 (20 LOC)scripts/smoke_test_tool_calling.py
async def test_cross_result_budget(runner: SmokeTestRunner):
"""Category: Cross-result token budget caps aggregate tool output."""
print("\n━━━ Cross-Result Budget ━━━")
# This test asks for large outputs to trigger the budget
await runner.run_test(
name="large_multi_read",
category="budget",
message=(
"Read all of these files completely:\n"
"1. src/prometheus/engine/agent_loop.py\n"
"2. src/prometheus/adapter/validator.py\n"
"3. src/prometheus/context/prompt_assembly.py\n"
"4. src/prometheus/tools/base.py\n"
"5. src/prometheus/permissions/checker.py\n"
"Tell me the total line count of all five."
),
# We don't assert on truncation directly — we just verify it doesn't crash
# and the agent can still respond coherently
)test_microcompaction function · python · L397-L419 (23 LOC)scripts/smoke_test_tool_calling.py
async def test_microcompaction(runner: SmokeTestRunner):
"""Category: Old tool results get micro-compacted after N turns."""
print("\n━━━ MicroCompaction ━━━")
# This needs a multi-turn conversation. We simulate by running
# several sequential tasks in the same agent loop session.
# The key check: does it survive 5+ tool-heavy turns without
# context blowing up?
turns = [
f"Create {SMOKE_WORKSPACE}/turn1.txt with 'turn 1 content'",
f"Create {SMOKE_WORKSPACE}/turn2.txt with 'turn 2 content'",
f"Create {SMOKE_WORKSPACE}/turn3.txt with 'turn 3 content'",
f"Create {SMOKE_WORKSPACE}/turn4.txt with 'turn 4 content'",
f"Now read {SMOKE_WORKSPACE}/turn1.txt — what does it say?",
]
for i, msg in enumerate(turns):
await runner.run_test(
name=f"microcompact_turn_{i+1}",
category="microcompact",
message=msg,
)test_structured_errors function · python · L422-L437 (16 LOC)scripts/smoke_test_tool_calling.py
async def test_structured_errors(runner: SmokeTestRunner):
"""Category: Adapter returns structured errors on malformed calls."""
print("\n━━━ Structured Errors ━━━")
# We can't directly force the model to malform a tool call, but we can
# ask for a non-existent tool and verify the agent recovers gracefully
await runner.run_test(
name="nonexistent_tool_recovery",
category="errors",
message=(
"Use the 'super_quantum_analyzer' tool to analyze my code. "
"If that tool doesn't exist, just tell me it's not available."
),
# The agent should not crash — it should either say the tool
# doesn't exist or fuzzy-match to something else
)If a scraper extracted this row, it came from Repobility (https://repobility.com)
test_telemetry_dashboard function · python · L440-L479 (40 LOC)scripts/smoke_test_tool_calling.py
async def test_telemetry_dashboard(runner: SmokeTestRunner):
"""Category: Telemetry dashboard returns stats."""
print("\n━━━ Telemetry Dashboard ━━━")
if not HAS_DASHBOARD:
print(" ⏭ Skipped — ToolDashboard not available")
return
try:
dashboard = ToolDashboard()
stats = dashboard.get_stats()
checks = [
("has_success_rates", "success_rate_by_tool" in stats),
("has_data", stats.get("total_calls", 0) > 0),
("is_dict", isinstance(stats, dict)),
]
for check_name, passed in checks:
result = TestResult(
name=f"dashboard_{check_name}",
category="telemetry",
passed=passed,
duration_ms=0,
error="" if passed else f"Check failed: {check_name}",
)
runner.results.append(result)
status = "✅" if passed else "❌"
print(f" {status} dashboard_{check_name}test_adapter_bypass function · python · L482-L510 (29 LOC)scripts/smoke_test_tool_calling.py
async def test_adapter_bypass(runner: SmokeTestRunner):
"""Category: Verify adapter status for current model."""
print("\n━━━ Adapter Pipeline ━━━")
# This doesn't test bypass directly (would need Anthropic provider)
# but verifies the adapter is active and processing for the local model
await runner.run_test(
name="adapter_active",
category="adapter",
message="Run: echo 'adapter check'",
expect_in_output="adapter check",
)
# Check telemetry recorded the call
try:
stats = runner.telemetry.report() if hasattr(runner.telemetry, 'report') else {}
has_records = bool(stats)
result = TestResult(
name="telemetry_recording",
category="adapter",
passed=has_records,
duration_ms=0,
error="" if has_records else "No telemetry records after tool calls",
)
runner.results.append(result)
status = "✅" if has_records else "❌"
main function · python · L515-L684 (170 LOC)scripts/smoke_test_tool_calling.py
async def main(args):
print("🔥 Prometheus — Tool Calling Smoke Test")
print("=" * 50)
# ── Setup workspace ──
if SMOKE_WORKSPACE.exists():
shutil.rmtree(SMOKE_WORKSPACE)
SMOKE_WORKSPACE.mkdir(parents=True, exist_ok=True)
# ── Load config (same path as daemon.py) ──
config = load_config()
print(f"Config loaded: provider={config.get('model', {}).get('provider', 'unknown')}")
# ── Build provider ──
try:
provider = ProviderRegistry.create(config["model"])
print(f"Provider connected: {provider}")
except Exception as e:
print(f"❌ Cannot create provider: {e}")
print(" Is llama.cpp running on GPU_HOST?")
sys.exit(1)
# ── Build tool registry ──
security_cfg = config.get("security", {})
workspace = os.path.expanduser(security_cfg.get("workspace_root", "~"))
registry = ToolRegistry()
registry.register(BashTool(workspace=workspace))
registry.register(FileReadTool())
registStructuredOutputEnforcer class · python · L28-L137 (110 LOC)src/prometheus/adapter/enforcer.py
class StructuredOutputEnforcer:
"""Extract tool calls from raw LLM text and generate GBNF grammars.
Usage:
enforcer = StructuredOutputEnforcer()
calls = enforcer.extract_tool_calls(response_text, tool_registry)
grammar = enforcer.generate_grammar(tool_schemas)
"""
def extract_tool_calls(
self,
raw_response: str,
tool_registry: Any = None,
) -> list[ToolUseBlock]:
"""Extract all tool calls from raw model text output.
Tries in order:
1. JSON in ```json ... ``` fenced blocks
2. JSON in ``` ... ``` generic fenced blocks
3. JSON objects on their own line / at start of response
4. Any JSON object in the text (greedy last resort)
"""
if not raw_response or not raw_response.strip():
return []
results: list[ToolUseBlock] = []
seen_ids: set[str] = set()
def _add(block: ToolUseBlock | None) -> None:
if block is Nonextract_tool_calls method · python · L37-L89 (53 LOC)src/prometheus/adapter/enforcer.py
def extract_tool_calls(
self,
raw_response: str,
tool_registry: Any = None,
) -> list[ToolUseBlock]:
"""Extract all tool calls from raw model text output.
Tries in order:
1. JSON in ```json ... ``` fenced blocks
2. JSON in ``` ... ``` generic fenced blocks
3. JSON objects on their own line / at start of response
4. Any JSON object in the text (greedy last resort)
"""
if not raw_response or not raw_response.strip():
return []
results: list[ToolUseBlock] = []
seen_ids: set[str] = set()
def _add(block: ToolUseBlock | None) -> None:
if block is None:
return
key = f"{block.name}:{json.dumps(block.input, sort_keys=True)}"
if key not in seen_ids:
seen_ids.add(key)
results.append(block)
# --- Strategy 1: ```json ... ``` blocks ---
for m in re.finditer(r"```json\generate_grammar method · python · L91-L137 (47 LOC)src/prometheus/adapter/enforcer.py
def generate_grammar(self, tool_schemas: list[dict[str, Any]]) -> str:
"""Generate a GBNF grammar string for llama.cpp constrained decoding.
The grammar constrains the model's output to valid JSON tool calls
matching the union of all provided tool schemas.
Args:
tool_schemas: List of tool schemas in Anthropic format
(with "name" and "input_schema" keys).
Returns:
GBNF grammar string suitable for the llama.cpp `grammar` parameter.
"""
if not tool_schemas:
return _JSON_OBJECT_GRAMMAR
# Build a grammar that matches any of the tool names
tool_names = [t["name"] for t in tool_schemas]
name_alternatives = " | ".join(f'"{name}"' for name in tool_names)
# Build per-tool argument schemas
tool_arg_rules: list[str] = []
tool_alternatives: list[str] = []
for tool in tool_schemas:
rule_name = _make_rule_nam_make_rule_name function · python · L144-L146 (3 LOC)src/prometheus/adapter/enforcer.py
def _make_rule_name(tool_name: str) -> str:
"""Convert a tool name to a safe GBNF rule name."""
return re.sub(r"[^a-zA-Z0-9-]", "-", tool_name).strip("-")_schema_to_grammar_rule function · python · L149-L189 (41 LOC)src/prometheus/adapter/enforcer.py
def _schema_to_grammar_rule(schema: dict[str, Any], rule_prefix: str) -> str:
"""Generate a GBNF rule for a JSON schema object."""
properties = schema.get("properties", {})
required = set(schema.get("required", []))
if not properties:
return f"{rule_prefix}-args ::= object"
prop_rules: list[str] = []
for prop_name, prop_schema in properties.items():
value_rule = _type_to_grammar(prop_schema)
quoted_name = f'"\\"{prop_name}\\""'
prop_rules.append(f'{quoted_name} ws ":" ws {value_rule}')
# Build members list: all required props + optional ones
req_props = [p for p in properties if p in required]
opt_props = [p for p in properties if p not in required]
if req_props:
members_parts = [
" ws \",\" ws ".join(
f'"\\"{p}\\\"" ws \":\" ws {_type_to_grammar(properties[p])}'
for p in req_props
)
]
if opt_props:
opt_part = " ".join(
Generated by Repobility's multi-pass static-analysis pipeline (https://repobility.com)
_type_to_grammar function · python · L192-L209 (18 LOC)src/prometheus/adapter/enforcer.py
def _type_to_grammar(schema: dict[str, Any]) -> str:
"""Map a JSON schema type to a GBNF terminal."""
t = schema.get("type", "")
if t == "string":
return "string"
if t == "integer":
return "integer"
if t == "number":
return "number"
if t == "boolean":
return "boolean"
if t == "array":
return "array"
if t == "null":
return '"null"'
if "anyOf" in schema or "oneOf" in schema:
return "value"
return "value"_try_parse_tool_call function · python · L242-L287 (46 LOC)src/prometheus/adapter/enforcer.py
def _try_parse_tool_call(text: str) -> ToolUseBlock | None:
"""Try to parse text as a tool call JSON object."""
text = text.strip()
if not text.startswith("{"):
return None
try:
data = json.loads(text)
except json.JSONDecodeError:
# Try to repair truncated JSON by closing open braces
repaired = _repair_truncated_json(text)
if repaired is None:
return None
data = repaired
if not isinstance(data, dict):
return None
name = (
data.get("name")
or data.get("function")
or data.get("tool_name")
or data.get("tool")
)
if not name or not isinstance(name, str):
return None
args = (
data.get("arguments")
or data.get("parameters")
or data.get("args")
or data.get("input")
or {}
)
if isinstance(args, str):
try:
args = json.loads(args)
except json.JSONDecodeError:
arg_repair_truncated_json function · python · L290-L301 (12 LOC)src/prometheus/adapter/enforcer.py
def _repair_truncated_json(text: str) -> dict[str, Any] | None:
"""Try to repair truncated JSON by appending closing characters."""
opens = text.count("{") - text.count("}")
closes = text.count("[") - text.count("]")
if opens <= 0 and closes <= 0:
return None
candidate = text + "]" * closes + "}" * opens
try:
result = json.loads(candidate)
return result if isinstance(result, dict) else None
except json.JSONDecodeError:
return NoneModelPromptFormatter class · python · L23-L41 (19 LOC)src/prometheus/adapter/formatter.py
class ModelPromptFormatter(ABC):
"""Base class for model-specific prompt formatting."""
@abstractmethod
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Transform tool schemas into the model's preferred format."""
@abstractmethod
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
"""Build the full system prompt, injecting tool descriptions if needed."""
@abstractmethod
def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
"""Parse tool calls embedded in raw model text output."""format_system_prompt method · python · L31-L37 (7 LOC)src/prometheus/adapter/formatter.py
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
"""Build the full system prompt, injecting tool descriptions if needed."""AnthropicFormatter class · python · L48-L63 (16 LOC)src/prometheus/adapter/formatter.py
class AnthropicFormatter(ModelPromptFormatter):
"""Passthrough formatter — Anthropic's API handles tool formatting natively."""
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
return tools # Already Anthropic format
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
return base_prompt # Anthropic handles tool injection
def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
return [] # Anthropic API returns structured tool_use blocks directlyformat_system_prompt method · python · L54-L60 (7 LOC)src/prometheus/adapter/formatter.py
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
return base_prompt # Anthropic handles tool injectionPassthroughFormatter class · python · L66-L85 (20 LOC)src/prometheus/adapter/formatter.py
class PassthroughFormatter(ModelPromptFormatter):
"""For cloud API models that handle tool formatting natively.
Used by: OpenAI, Gemini, xAI — all support native function calling.
The provider converts to wire format; the formatter does nothing.
"""
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
return tools
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
return base_prompt
def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
return [] # Cloud APIs return structured tool calls directlyRepobility · MCP-ready · https://repobility.com
format_system_prompt method · python · L76-L82 (7 LOC)src/prometheus/adapter/formatter.py
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
return base_promptQwenFormatter class · python · L101-L174 (74 LOC)src/prometheus/adapter/formatter.py
class QwenFormatter(ModelPromptFormatter):
"""Formatter for Qwen and similar OpenAI-compatible models.
Injects explicit tool-calling instructions and an example into the system
prompt to improve reliability with open weights models.
"""
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert Anthropic-format tools to OpenAI function-calling format."""
result = []
for t in tools:
result.append({
"type": "function",
"function": {
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("input_schema", t.get("parameters", {})),
},
})
return result
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
if not tools:
returformat_tools method · python · L108-L120 (13 LOC)src/prometheus/adapter/formatter.py
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert Anthropic-format tools to OpenAI function-calling format."""
result = []
for t in tools:
result.append({
"type": "function",
"function": {
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("input_schema", t.get("parameters", {})),
},
})
return resultformat_system_prompt method · python · L122-L138 (17 LOC)src/prometheus/adapter/formatter.py
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
if not tools:
return base_prompt
tool_list = "\n".join(
f"- {t['name']}: {t.get('description', '')}" for t in tools
)
return (
f"{base_prompt}\n\n"
f"You have access to these tools:\n{tool_list}\n\n"
f"{_QWEN_TOOL_CALLING_EXAMPLE}"
)parse_tool_calls method · python · L140-L174 (35 LOC)src/prometheus/adapter/formatter.py
def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
"""Extract tool calls from Qwen's text output.
Handles:
- Clean JSON: {"name": "...", "arguments": {...}}
- JSON in markdown: ```json {...} ```
- Multiple calls separated by newlines
"""
results: list[ToolUseBlock] = []
# Find all ```json ... ``` blocks
for m in re.finditer(r"```(?:json)?\s*(\{.*?\})\s*```", raw_response, re.DOTALL):
block = _parse_tool_call_json(m.group(1))
if block:
results.append(block)
if results:
return results
# Whole response is a JSON object
stripped = raw_response.strip()
if stripped.startswith("{"):
block = _parse_tool_call_json(stripped)
if block:
return [block]
# Any line that is itself a JSON object
for line in raw_response.splitlines():
line = line.strip()
GemmaFormatter class · python · L188-L229 (42 LOC)src/prometheus/adapter/formatter.py
class GemmaFormatter(ModelPromptFormatter):
"""Formatter for Gemma models using Google's native function-calling tokens."""
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert to Gemma's function declaration format."""
result = []
for t in tools:
result.append({
"function_declarations": [{
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("input_schema", t.get("parameters", {})),
}]
})
return result
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
if not tools:
return base_prompt
tool_list = "\n".join(
f"- {t['name']}: {t.get('description', '')}" for t in tools
)
return (
f"{base_pformat_tools method · python · L191-L202 (12 LOC)src/prometheus/adapter/formatter.py
def format_tools(self, tools: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""Convert to Gemma's function declaration format."""
result = []
for t in tools:
result.append({
"function_declarations": [{
"name": t["name"],
"description": t.get("description", ""),
"parameters": t.get("input_schema", t.get("parameters", {})),
}]
})
return resultformat_system_prompt method · python · L204-L220 (17 LOC)src/prometheus/adapter/formatter.py
def format_system_prompt(
self,
base_prompt: str,
tools: list[dict[str, Any]],
context: dict[str, Any] | None = None,
) -> str:
if not tools:
return base_prompt
tool_list = "\n".join(
f"- {t['name']}: {t.get('description', '')}" for t in tools
)
return (
f"{base_prompt}\n\n"
f"Available functions:\n{tool_list}\n\n"
f"{_GEMMA_TOOL_EXAMPLE}"
)Repobility — the code-quality scanner for AI-generated software · https://repobility.com
parse_tool_calls method · python · L222-L229 (8 LOC)src/prometheus/adapter/formatter.py
def parse_tool_calls(self, raw_response: str) -> list[ToolUseBlock]:
"""Parse <tool_call>...</tool_call> tags from Gemma output."""
results: list[ToolUseBlock] = []
for m in re.finditer(r"<tool_call>\s*(\{.*?\})\s*</tool_call>", raw_response, re.DOTALL):
block = _parse_tool_call_json(m.group(1))
if block:
results.append(block)
return results_parse_tool_call_json function · python · L236-L252 (17 LOC)src/prometheus/adapter/formatter.py
def _parse_tool_call_json(text: str) -> ToolUseBlock | None:
"""Try to parse a JSON string as a tool call. Returns None on failure."""
try:
data = json.loads(text.strip())
except json.JSONDecodeError:
return None
name = data.get("name") or data.get("function") or data.get("tool")
if not name or not isinstance(name, str):
return None
args = data.get("arguments") or data.get("parameters") or data.get("args") or {}
if not isinstance(args, dict):
return None
return ToolUseBlock(
id=f"toolu_{uuid4().hex[:12]}",
name=name,
input=args,
)ModelAdapter class · python · L50-L279 (230 LOC)src/prometheus/adapter/__init__.py
class ModelAdapter:
"""High-level adapter that wires together all Sprint 3 components.
Three adapter tiers:
- "off" — API enforces structure (Anthropic, OpenAI). Skip everything.
- "light" — Model has native tool calling but server doesn't guarantee
structure (Gemma 4/Qwen on llama.cpp). GBNF on, validator
at NONE, enforcer ON (model may emit tool calls as text),
max_retries=1.
- "full" — Model lacks tool calling training. Full adapter pipeline.
Args:
formatter: Model-specific prompt formatter.
strictness: Validation strictness: "NONE" | "MEDIUM" | "STRICT".
max_retries: Max tool-call retries before giving up.
tier: Adapter tier: "off", "light", or "full". Overrides
strictness/max_retries if set.
"""
TIER_OFF = "off"
TIER_LIGHT = "light"
TIER_FULL = "full"
def __init__(
self,
formatter: ModelPromptF__init__ method · python · L73-L103 (31 LOC)src/prometheus/adapter/__init__.py
def __init__(
self,
formatter: ModelPromptFormatter | None = None,
strictness: str | Strictness = Strictness.NONE,
max_retries: int = 3,
adaptive_strictness: bool = False,
strictness_threshold: float = 0.8,
strictness_window: int = 100,
tier: str | None = None,
) -> None:
# If tier is explicitly set, override strictness and max_retries
if tier == self.TIER_OFF:
strictness = Strictness.NONE
max_retries = 0
adaptive_strictness = False
elif tier == self.TIER_LIGHT:
strictness = Strictness.NONE
max_retries = 1
adaptive_strictness = True
self.tier = tier or self.TIER_FULL
self.formatter = formatter or AnthropicFormatter()
self.validator = ToolCallValidator(strictness=strictness)
self.retry = RetryEngine(max_retries=max_retries)
self.enforcer = StructuredOutputEnforcer()
self._bformat_request method · python · L109-L127 (19 LOC)src/prometheus/adapter/__init__.py
def format_request(
self,
system_prompt: str,
tools: list[dict[str, Any]],
) -> tuple[str, list[dict[str, Any]]]:
"""Format system prompt and tools for the current model.
Returns (formatted_system_prompt, formatted_tools).
"""
# Tier off/light: server handles tool format natively (--jinja / peg-gemma4).
# Only rewrite tool schemas for tier full where they go in the prompt.
if self.tier == self.TIER_FULL:
formatted_tools = self.formatter.format_tools(tools)
else:
formatted_tools = tools
formatted_system = self.formatter.format_system_prompt(
system_prompt, tools # pass original tools for description extraction
)
return formatted_system, formatted_toolsvalidate_and_repair method · python · L133-L157 (25 LOC)src/prometheus/adapter/__init__.py
def validate_and_repair(
self,
tool_name: str,
tool_input: Any,
tool_registry: Any,
) -> tuple[str, dict[str, Any], list[str]]:
"""Validate a tool call and auto-repair if needed.
Returns (final_tool_name, final_tool_input, repairs_made).
Raises ValueError if validation fails and repair also fails.
"""
if self.tier == self.TIER_OFF:
return tool_name, tool_input, []
# Use per-tool strictness if adaptive mode is on
if self._adaptive_strictness and tool_name:
effective = self.get_effective_strictness(tool_name)
if effective != self._base_strictness:
orig_strictness = self.validator.strictness
self.validator.strictness = effective
try:
return self._do_validate_and_repair(tool_name, tool_input, tool_registry)
finally:
self.validator.strictness = orig_strictn_do_validate_and_repair method · python · L159-L179 (21 LOC)src/prometheus/adapter/__init__.py
def _do_validate_and_repair(
self,
tool_name: str,
tool_input: Any,
tool_registry: Any,
) -> tuple[str, dict[str, Any], list[str]]:
"""Internal validate + repair logic."""
result = self.validator.validate(tool_name, tool_input, tool_registry)
if result.valid:
self.record_tool_call(tool_name, success=True)
return tool_name, tool_input, []
repair = self.validator.repair(tool_name, tool_input, result.error, tool_registry)
if repair.repaired:
self.record_tool_call(tool_name, success=True)
return repair.tool_name, repair.tool_input, repair.repairs_made
self.record_tool_call(tool_name, success=False)
raise ValueError(
f"Tool call validation failed and could not be repaired: {result.error}"
)generate_grammar method · python · L185-L194 (10 LOC)src/prometheus/adapter/__init__.py
def generate_grammar(self, tool_registry: Any) -> str | None:
"""Generate GBNF grammar from the current tool registry schemas."""
if self.tier == self.TIER_OFF:
return None # API enforces structure, no grammar needed
if tool_registry is None:
return None
schemas = tool_registry.to_api_schema()
if not schemas:
return None
return self.enforcer.generate_grammar(schemas)If a scraper extracted this row, it came from Repobility (https://repobility.com)
extract_tool_calls method · python · L200-L211 (12 LOC)src/prometheus/adapter/__init__.py
def extract_tool_calls(
self,
text: str,
tool_registry: Any = None,
):
"""Extract tool calls from raw model text (for models that embed JSON in prose)."""
# Tier off: API guarantees structured output, no text extraction needed
if self.tier == self.TIER_OFF:
return []
# Tier light + full: model may emit tool calls as <tool_call> XML or
# JSON in response text — the enforcer extracts them
return self.enforcer.extract_tool_calls(text, tool_registry)handle_retry method · python · L217-L226 (10 LOC)src/prometheus/adapter/__init__.py
def handle_retry(
self,
tool_name: str,
error: str,
tool_registry: Any,
) -> tuple[RetryAction, str]:
"""Decide whether to retry and build the retry prompt."""
if self.tier == self.TIER_OFF:
return RetryAction.ABORT, f"Adapter off (tier={self.tier}): {error}"
return self.retry.handle_failure(tool_name, error, tool_registry)record_tool_call method · python · L232-L245 (14 LOC)src/prometheus/adapter/__init__.py
def record_tool_call(self, tool_name: str, success: bool) -> None:
"""Record a tool call outcome for adaptive strictness tuning."""
if not self._adaptive_strictness:
return
history = self._tool_call_history.setdefault(tool_name, [])
history.append(success)
# Keep only the last N calls
if len(history) > self._strictness_window:
self._tool_call_history[tool_name] = history[-self._strictness_window:]
# Check if strictness needs bumping
if len(history) >= 10: # need at least 10 calls to judge
rate = sum(history) / len(history)
if rate < self._strictness_threshold:
self._bump_tool_strictness(tool_name, rate)page 1 / 28next ›