Function bodies 49 total
main function · python · L30-L102 (73 LOC)examples/math_agent.py
async def main():
# -------------------------------------------------------------------------
# 1. Setup
# -------------------------------------------------------------------------
# Create SGLangModel with token-level trajectory tracking support
client = SGLangClient(base_url=os.environ.get("SGLANG_BASE_URL", "http://localhost:30000"))
model_info = await client.get_model_info()
tokenizer = AutoTokenizer.from_pretrained(model_info["model_path"])
model = SGLangModel(
client=client,
tokenizer=tokenizer,
tool_parser=HermesToolParser(),
sampling_params={"max_new_tokens": 16384}, # Limit response length
)
# -------------------------------------------------------------------------
# 2. Math 500 Example
# -------------------------------------------------------------------------
print("\n" + "=" * 60)
print("Math 500 Example")
print("=" * 60)
# Reset for new episode
model.reset()
# Crefind_drift_index function · python · L33-L40 (8 LOC)examples/retokenization_drift/main.py
def find_drift_index(original: list[int], re_encoded: list[int]) -> int | None:
"""Find first index where tokens diverge."""
for i, (a, b) in enumerate(zip(original, re_encoded)):
if a != b:
return i
if len(original) != len(re_encoded):
return min(len(original), len(re_encoded))
return Nonemain function · python · L43-L129 (87 LOC)examples/retokenization_drift/main.py
async def main():
model_id = os.environ.get("SGLANG_MODEL_ID", "Qwen/Qwen3-4B-Thinking-2507")
base_url = os.environ.get("SGLANG_BASE_URL", "http://localhost:30000")
print(f"Model: {model_id}")
print(f"Server: {base_url}\n")
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
client = SGLangClient(base_url=base_url)
model = SGLangModel(
client=client,
tokenizer=tokenizer,
tool_parser=HermesToolParser(),
sampling_params={"max_new_tokens": 32768},
)
# Complex multi-step problem to induce extended thinking
problem = """
A farmer has 3 fields. Field A is 2.5 acres, Field B is 3.75 acres, Field C is 1.8 acres.
Crop yields per acre: Wheat=$450, Corn=$380, Soybeans=$520.
Costs per acre: Wheat=$120, Corn=$95, Soybeans=$150.
The farmer plants wheat in Field A, corn in Field B, and soybeans in Field C.
There's also a 15% tax on total profit.
Calculate: (1) Revenue per fiemain function · python · L44-L102 (59 LOC)examples/structured_output.py
async def main():
# -------------------------------------------------------------------------
# Setup
# -------------------------------------------------------------------------
client = SGLangClient(base_url=os.environ.get("SGLANG_BASE_URL", "http://localhost:30000"))
model_info = await client.get_model_info()
tokenizer = AutoTokenizer.from_pretrained(model_info["model_path"])
model = SGLangModel(
client=client,
tokenizer=tokenizer,
sampling_params={"max_new_tokens": 1024},
)
# -------------------------------------------------------------------------
# Example 1: Code Review (LLM-as-Judge)
# -------------------------------------------------------------------------
print("\n" + "=" * 60)
print("Example 1: Code Review")
print("=" * 60)
code_to_review = """
def fibonacci(n):
if n <= 1:
return n
return fibonacci(n-1) + fibonacci(n-2)
"""
prompt = [{"role": "user", "content": [{"text"read_image function · python · L38-L64 (27 LOC)examples/vlm_agent/vlm_agent.py
def read_image(file_path: str) -> dict:
"""Read an image file and return it for visual inspection.
Args:
file_path: Filename of the image (e.g., "a.jpg", "b.png").
"""
path = Path(file_path)
if not path.exists():
path = IMAGE_DIR / path.name
if not path.exists():
return {
"status": "error",
"content": [{"text": f"File not found: {path}"}],
}
image_bytes = path.read_bytes()
suffix = path.suffix.lower().lstrip(".")
fmt = {"jpg": "jpeg", "jpeg": "jpeg", "png": "png", "gif": "gif", "webp": "webp"}.get(suffix, "png")
return {
"status": "success",
"content": [
{"text": f"Image loaded: {path.name} ({len(image_bytes)} bytes)"},
{"image": {"format": fmt, "source": {"bytes": image_bytes}}},
],
}_summarize_for_display function · python · L67-L78 (12 LOC)examples/vlm_agent/vlm_agent.py
def _summarize_for_display(obj):
"""Make trajectory JSON-serializable with truncated binary data."""
if isinstance(obj, bytes):
return f"<{len(obj)} bytes>"
if isinstance(obj, dict):
return {k: _summarize_for_display(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_summarize_for_display(v) for v in obj]
if isinstance(obj, str) and obj.startswith("data:image/") and ";base64," in obj:
prefix = obj[: obj.index(";base64,") + len(";base64,")]
return prefix + obj[len(prefix) : len(prefix) + 20] + "..."
return objmain function · python · L81-L208 (128 LOC)examples/vlm_agent/vlm_agent.py
async def main():
# -------------------------------------------------------------------------
# 1. Setup
# -------------------------------------------------------------------------
base_url = os.environ.get("SGLANG_BASE_URL", "http://localhost:30000")
client = SGLangClient(base_url=base_url)
model_info = await client.get_model_info()
model_path = os.environ.get("MODEL_PATH", model_info["model_path"])
processor = AutoProcessor.from_pretrained(model_path, trust_remote_code=True)
model = SGLangModel(
client=client,
processor=processor,
tool_parser=HermesToolParser(),
sampling_params={"max_new_tokens": 8192},
)
# -------------------------------------------------------------------------
# 2. Run VLM Agent
# -------------------------------------------------------------------------
print("\n" + "=" * 60)
print("VLM Agent Example")
print("=" * 60)
agent = Agent(
model=model,
Repobility · open methodology · https://repobility.com/research/
SGLangClient.__init__ method · python · L88-L122 (35 LOC)src/strands_sglang/client.py
def __init__(
self,
base_url: str,
*,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
timeout: float | None = 900.0,
connect_timeout: float = 5.0,
max_retries: int = 60,
retry_delay: float = 1.0,
) -> None:
"""Initialize SGLang client.
Args:
base_url: SGLang server URL (e.g., "http://localhost:30000").
max_connections: Maximum concurrent connections (default: 1000).
timeout: Request timeout in seconds, or None for infinite (default: 900.0).
connect_timeout: TCP connection timeout in seconds (default: 5s).
max_retries: Maximum retry attempts on transient errors (default: 60, like slime).
retry_delay: Delay between retries in seconds (default: 1.0).
"""
self.base_url = base_url.rstrip("/")
self.max_retries = max_retries
self.retry_delay = retry_delay
# Store config for lazy session creatioSGLangClient._get_session method · python · L124-L132 (9 LOC)src/strands_sglang/client.py
def _get_session(self) -> aiohttp.ClientSession:
"""Get or create the aiohttp session (lazy initialization)."""
if self._session is None or self._session.closed:
self._session = aiohttp.ClientSession(
base_url=self.base_url,
timeout=aiohttp.ClientTimeout(total=self._timeout, connect=self._connect_timeout),
connector=aiohttp.TCPConnector(limit=self._max_connections),
)
return self._sessionSGLangClient.__del__ method · python · L140-L145 (6 LOC)src/strands_sglang/client.py
def __del__(self) -> None:
"""Sync cleanup to prevent aiohttp 'Unclosed client session' warnings at shutdown."""
if self._session is not None and not self._session.closed:
if self._session.connector is not None and not self._session.connector.closed:
self._session.connector._close()
self._session._connector = NoneSGLangClient._classify_http_error method · python · L156-L181 (26 LOC)src/strands_sglang/client.py
def _classify_http_error(status: int, body: str) -> SGLangHTTPError:
"""Classify an HTTP error into a specific custom exception.
This is the single source of truth for error classification. All HTTP errors
from SGLang are mapped to custom exceptions here, so that sglang.py never
needs to inspect raw status codes or response bodies.
Args:
status: HTTP status code.
body: Response body text.
Returns:
Appropriate SGLangHTTPError subclass instance.
"""
# Context length exceeded (400 + length keywords) — non-retryable
if status == 400:
body_lower = body.lower()
if any(p in body_lower for p in CONTEXT_LENGTH_PATTERNS):
return SGLangContextLengthError(f"Context length exceeded (400): {body}", status=status, body=body)
# Rate-limited or temporarily unavailable — retryable
if status in (429, 503):
return SGLangThroSGLangClient._is_retryable_error method · python · L183-L203 (21 LOC)src/strands_sglang/client.py
def _is_retryable_error(self, e: Exception) -> bool:
"""Check if an error is retryable.
Aligned with slime's philosophy: retry aggressively on most errors.
For local SGLang servers, most 400 errors are transient (weight reloading, memory pressure).
Non-retryable:
- 401/403/404: Auth/routing errors that won't self-resolve
- 400 with context length keywords: Prompt too long, retrying won't help
"""
if isinstance(e, SGLangHTTPError):
# Non-retryable: auth/routing errors
if e.status in NON_RETRYABLE_STATUS_CODES:
return False
# Non-retryable: context length exceeded
if isinstance(e, SGLangContextLengthError):
return False
# Retry everything else: 5xx, 408, 429, other 400s, etc.
return True
# Retry all connection/timeout/decoding errors
return TrueSGLangClient.generate method · python · L205-L276 (72 LOC)src/strands_sglang/client.py
async def generate(self, input_ids: list[int], **kwargs: Any) -> dict[str, Any]:
"""Generate from SGLang `/generate` endpoint.
Args:
input_ids: Input token IDs. Do not set `text` when `input_ids` is provided.
**kwargs: Additional parameters passed directly to SGLang (see full list in SGLang documentation).
Returns:
Response dict with text, output_ids, meta_info (logprobs, finish_reason, etc.).
Raises:
SGLangContextLengthError: When prompt exceeds model's maximum context length.
SGLangThrottledError: On 429 or 503 responses.
SGLangHTTPError: For non-retryable HTTP errors (401, 403, 404) or after all retries exhausted.
SGLangConnectionError: For connection/timeout failures after retries exhausted.
SGLangDecodingError: When server returns non-JSON response after retries exhausted.
"""
payload: dict[str, Any] = {
"input_ids": inpSGLangClient.health method · python · L278-L289 (12 LOC)src/strands_sglang/client.py
async def health(self) -> bool:
"""Check if SGLang server is healthy.
Returns:
True if server responds OK to `/health` endpoint, False otherwise.
"""
try:
session = self._get_session()
async with session.get("/health") as resp:
return resp.status == 200
except Exception:
return FalseSGLangClient.get_model_info method · python · L291-L307 (17 LOC)src/strands_sglang/client.py
async def get_model_info(self) -> dict[str, Any] | None:
"""Get model information from the SGLang server.
Returns:
Dict containing model info from `/get_model_info` endpoint, or None on error.
Important fields include:
- model_path: HuggingFace model ID or local path
- tokenizer_path: Tokenizer path (may differ from model_path)
"""
try:
session = self._get_session()
async with session.get("/get_model_info") as resp:
if resp.status >= 400:
return None
return await resp.json(content_type=None)
except Exception:
return NoneIf a scraper extracted this row, it came from Repobility (https://repobility.com)
SGLangModel.__init__ method · python · L91-L124 (34 LOC)src/strands_sglang/sglang.py
def __init__(
self,
*,
client: SGLangClient,
tokenizer: PreTrainedTokenizerBase | None = None,
processor: ProcessorMixin | None = None,
tool_parser: ToolParser | None = None,
**config: Unpack[SGLangConfig],
) -> None:
"""Initialize SGLang model provider.
Args:
client: `SGLangClient` for HTTP communication with the SGLang server.
tokenizer: HuggingFace tokenizer for chat template and tokenization (optional if processor is provided).
processor: HuggingFace processor for multimodal processing.
tool_parser: `ToolParser` for tool calls (default: `HermesToolParser`).
**config: Additional SGLang generation configuration.
"""
self.client = client
self.processor = processor
self.tokenizer = (processor and processor.tokenizer) or tokenizer
if not self.tokenizer:
raise ValueError("Either tokenizer (text-onSGLangModel.reset method · python · L126-L135 (10 LOC)src/strands_sglang/sglang.py
def reset(self) -> None:
"""Reset token accumulation for a new episode.
Call this at episode start. Clears all accumulated tokens and resets
internal state for tool tracking.
"""
self.token_manager.reset()
self._processed_message_count = 0
self.tool_parse_errors = {}
self.image_data = []SGLangModel.update_config method · python · L147-L153 (7 LOC)src/strands_sglang/sglang.py
def update_config(self, **model_config: Unpack[SGLangConfig]) -> None: # type: ignore[override]
"""Update the model configuration.
Args:
**model_config: Configuration overrides.
"""
self.config.update(model_config)SGLangModel.get_config method · python · L156-L162 (7 LOC)src/strands_sglang/sglang.py
def get_config(self) -> SGLangConfig:
"""Get the model configuration.
Returns:
The model configuration dict.
"""
return cast(SGLangModel.SGLangConfig, self.config)SGLangModel.format_content_block method · python · L169-L191 (23 LOC)src/strands_sglang/sglang.py
def format_content_block(
cls, content: ContentBlock | ToolResultContent, is_multimodal: bool = False
) -> dict[str, Any] | str:
"""Convert a single Strands `ContentBlock` or `ToolResultContent` to HF chat template format."""
# keep dict structure for multimodal content
hf_content = {}
match content:
case {"text": text}:
hf_content = {"type": "text", "text": text}
case {"image": image}:
mime = f"image/{image['format']}"
encoded = base64.b64encode(image["source"]["bytes"]).decode()
hf_content = {"type": "image", "image": f"data:{mime};base64,{encoded}"}
case {"json": data}:
# json only for tool results
hf_content = {"type": "text", "text": json.dumps(data)}
# TODO: add support for other content types
case _:
raise TypeError(f"content_type=<{next(iter(content))}> | unsuppSGLangModel.format_messages method · python · L194-L225 (32 LOC)src/strands_sglang/sglang.py
def format_messages(
cls, messages: Messages, system_prompt: str | None = None, is_multimodal: bool = False
) -> list[dict[str, Any]]:
"""Convert Strands Messages to HF chat template format.
When ``is_multimodal=False`` (default), content is flattened to a plain string.
When ``is_multimodal=True``, content is kept as a list of dicts.
"""
result: list[dict[str, Any]] = []
if system_prompt:
result.append({"role": "system", "content": system_prompt})
# Each Strands message is {"role": str, "content": [ContentBlock, ...]}
# One Strands message maps to one HF message, except toolResult blocks
# which each become a separate HF message with role="tool".
for msg in messages:
if "toolResult" in msg["content"][0]:
# Each toolResult → its own HF message (different tool_call_id)
for cb in msg["content"]:
assert "toolResult" iSGLangModel.format_tool_specs method · python · L227-L239 (13 LOC)src/strands_sglang/sglang.py
def format_tool_specs(self, tool_specs: list[ToolSpec]) -> list[dict]:
"""Format strands ToolSpecs to OpenAI format for chat templates."""
return [
{
"type": "function",
"function": {
"name": spec["name"],
"description": spec["description"],
"parameters": spec["inputSchema"]["json"],
},
}
for spec in tool_specs
]SGLangModel.format_prompt method · python · L241-L264 (24 LOC)src/strands_sglang/sglang.py
def format_prompt(
self,
messages: Messages,
system_prompt: str | None = None,
tools: list[dict] | None = None,
) -> str:
"""Format messages into a prompt ready for model generation.
Applies the HuggingFace chat template with `add_generation_prompt=True`,
which appends the assistant turn prefix for the model to continue.
The result is manually tokenized (not model-generated) and added to
the token trajectory with `loss_mask=False`.
"""
chat_messages = self.format_messages(messages, system_prompt, is_multimodal=self.is_multimodal)
self.image_data.extend(self.extract_image_urls(chat_messages))
# TODO: add support for other modalities later
return self.tokenizer.apply_chat_template(
conversation=chat_messages,
tools=tools,
add_generation_prompt=True,
tokenize=False,
enable_thinking=self.config.get("enable_thinRepobility · severity-and-effort ranking · https://repobility.com
SGLangModel.extract_image_urls method · python · L267-L278 (12 LOC)src/strands_sglang/sglang.py
def extract_image_urls(messages: list[dict[str, Any]]) -> list[str]:
"""Extract image data URLs from HF-formatted multimodal messages."""
urls: list[str] = []
for msg in messages:
content = msg.get("content")
if isinstance(content, dict) and content.get("type") == "image":
urls.append(content["image"])
elif isinstance(content, list):
for part in content:
if isinstance(part, dict) and part.get("type") == "image":
urls.append(part["image"])
return urlsSGLangModel.tokenize_prompt_messages method · python · L284-L316 (33 LOC)src/strands_sglang/sglang.py
def tokenize_prompt_messages(
self,
messages: Messages,
system_prompt: str | None,
tools: list[dict] | None = None,
) -> list[int] | None:
"""Tokenize prompt messages for the next generation call.
First call: tokenizes full prompt with system prompt and tools.
Subsequent calls: tokenizes only new messages (tool results, user messages),
prepending the message separator to align with chat template formatting.
For VLM (when ``self.processor`` is set), uses the processor to insert
image placeholder tokens based on ``self.image_data``.
"""
def _tokenize(text: str) -> list[int]:
if self.processor:
return self.processor(text=text, images=self.image_data or None)["input_ids"][0]
return self.tokenizer.encode(text, add_special_tokens=False)
# First call: full prompt with tools
if len(self.token_manager) == 0:
formatted = sSGLangModel._sort_tool_results method · python · L318-L333 (16 LOC)src/strands_sglang/sglang.py
def _sort_tool_results(self, messages: Messages) -> Messages:
"""Sort tool results by ID to match original call order (IDs are sequential: call_0000, call_0001, ...)."""
result = []
for msg in messages:
if msg.get("role") != "user" or not isinstance(msg.get("content"), list):
result.append(msg)
continue
content = msg["content"]
tool_results = [b for b in content if isinstance(b, dict) and "toolResult" in b]
if not tool_results:
result.append(msg)
continue
other = [b for b in content if not (isinstance(b, dict) and "toolResult" in b)]
tool_results.sort(key=lambda b: b.get("toolResult", {}).get("toolUseId", ""))
result.append({**msg, "content": other + tool_results})
return resultSGLangModel._yield_tool_use_events method · python · L335-L371 (37 LOC)src/strands_sglang/sglang.py
def _yield_tool_use_events(
self,
tool_calls: list[ToolParseResult],
) -> Iterator[StreamEvent]:
"""Yield toolUse stream events for parsed tool calls.
Each tool call emits three events following the Strands streaming protocol:
- `contentBlockStart`: begins block with toolUseId and name
- `contentBlockDelta`: contains the tool input (delta = incremental data)
- `contentBlockStop`: ends the block
"""
for tool_call in tool_calls:
if tool_call.is_error:
logger.warning(f"Tool parse error for '{tool_call.name}': {(tool_call.raw or '')[:100]}")
# Track parse error count per tool name
self.tool_parse_errors[tool_call.name] = self.tool_parse_errors.get(tool_call.name, 0) + 1
yield {
"contentBlockStart": {
"start": {
"toolUse": {
"toolUseId": tool_call.id,
SGLangModel._extract_logprobs method · python · L373-L379 (7 LOC)src/strands_sglang/sglang.py
def _extract_logprobs(self, event: dict[str, Any], key: str) -> list[float] | None:
"""Extract logprobs from SGLang event (format: [[logprob, token_id, ...], ...])."""
meta_info = event.get("meta_info", {})
logprobs = meta_info.get(key) or event.get(key)
if isinstance(logprobs, list) and logprobs:
return [entry[0] for entry in logprobs]
return NoneSGLangModel.stream method · python · L382-L473 (92 LOC)src/strands_sglang/sglang.py
async def stream(
self,
messages: Messages,
tool_specs: list[ToolSpec] | None = None,
system_prompt: str | None = None,
*,
tool_choice: ToolChoice | None = None,
system_prompt_content: list[SystemContentBlock] | None = None,
**kwargs: Any,
) -> AsyncIterable[StreamEvent]:
"""Chat completion with SGLangModel using the `/generate` endpoint.
The `stream` method follows Strands' protocol but actually disabled here for training-only usage.
This means users won't see streaming behavior such as print callbacks.
"""
# Prepare request
tools = self.format_tool_specs(tool_specs) if tool_specs else None
config = self.get_config()
sampling_params: dict[str, Any] = dict(config.get("sampling_params") or {})
return_logprob = config.get("return_logprob", True)
new_input_tokens = self.tokenize_prompt_messages(messages, system_prompt, tools=tools)
SGLangModel.structured_output method · python · L476-L532 (57 LOC)src/strands_sglang/sglang.py
async def structured_output(
self,
output_model: Type[T],
prompt: Messages,
system_prompt: str | None = None,
**kwargs: Any,
) -> AsyncGenerator[dict[str, T | Any], None]:
"""Get structured output using SGLang's constrained decoding.
Uses SGLang's `json_schema` parameter for FSM-based constrained generation,
guaranteeing output conforms to the Pydantic model schema.
Note: This method does NOT update token_manager (no TITO tracking).
Intended for inference-only use cases like LLM-as-Judge.
Args:
output_model: Pydantic model class defining the output schema.
prompt: Messages to send to the model.
system_prompt: Optional system prompt.
**kwargs: Additional arguments (unused).
Yields:
Single dict with "output" key containing the parsed Pydantic model instance.
Raises:
ValidationError: If model output faiTokenManager.add_prompt method · python · L82-L102 (21 LOC)src/strands_sglang/token.py
def add_prompt(self, token_ids: list[int], logprobs: list[float] | None = None) -> None:
"""Add a prompt segment (system messages, user input, tool results).
Args:
token_ids: Token IDs for this segment.
logprobs: Optional log probabilities (from forward pass).
"""
if not token_ids:
return
if logprobs is not None and len(logprobs) != len(token_ids):
raise ValueError(f"logprobs length ({len(logprobs)}) must match token_ids length ({len(token_ids)})")
tokens = [
Token(
token_id=tid,
logprob=logprobs[i] if logprobs is not None else None,
loss_mask=False,
)
for i, tid in enumerate(token_ids)
]
self._segments.append(tokens)Powered by Repobility — scan your code at https://repobility.com
TokenManager.add_response method · python · L104-L130 (27 LOC)src/strands_sglang/token.py
def add_response(self, token_ids: list[int], logprobs: list[float] | None = None) -> None:
"""Add a response segment (model output).
Args:
token_ids: Token IDs for this segment.
logprobs: Optional log probabilities for each token.
Raises:
RuntimeError: If no prompt segment has been added yet.
ValueError: If logprobs length doesn't match token_ids length.
"""
if not token_ids:
return
if not self._segments:
raise RuntimeError("First segment must be a prompt. Call add_prompt() before add_response().")
if logprobs is not None and len(logprobs) != len(token_ids):
raise ValueError(f"logprobs length ({len(logprobs)}) must match token_ids length ({len(token_ids)})")
tokens = [
Token(
token_id=tid,
logprob=logprobs[i] if logprobs is not None else None,
loss_mask=True,
)
TokenManager.__repr__ method · python · L183-L188 (6 LOC)src/strands_sglang/token.py
def __repr__(self) -> str:
"""Return string representation."""
n_segments = len(self._segments)
n_tokens = len(self)
n_output = sum(1 for token in self.tokens if token.loss_mask)
return f"TokenManager(segments={n_segments}, tokens={n_tokens}, output_tokens={n_output})"ToolLimiter.__init__ method · python · L73-L93 (21 LOC)src/strands_sglang/tool_limiter.py
def __init__(
self,
max_tool_iters: int | None = None,
max_tool_calls: int | None = None,
max_parallel_tool_calls: int | None = None,
):
"""Initialize the limiter.
Args:
max_tool_iters: Maximum number of tool iterations allowed.
One iteration = one model response with tool calls + execution.
Parallel tool calls count as one iteration. None means no limit.
max_tool_calls: Maximum number of individual tool calls allowed.
Each tool call counts individually regardless of parallelism. None means no limit.
max_parallel_tool_calls: Maximum number of parallel tool calls allowed per model response. Excess calls are cancelled and returned to the
model as error results. None means no limit.
"""
self.max_tool_iters = max_tool_iters
self.max_tool_calls = max_tool_calls
self.max_parallel_tool_calls = max_parallel_ToolLimiter.reset method · python · L95-L100 (6 LOC)src/strands_sglang/tool_limiter.py
def reset(self) -> None:
"""Reset counters for a new invocation."""
self.tool_iter_count = 0
self.tool_call_count = 0
self._parallel_call_count = 0
self.cancelled_tool_call_count = 0ToolLimiter._on_message_added method · python · L107-L149 (43 LOC)src/strands_sglang/tool_limiter.py
def _on_message_added(self, event: MessageAddedEvent) -> None:
"""Count iterations/calls and raise when limit exceeded.
- Counts on assistant messages with toolUse (model requesting tools)
- Raises on user messages with toolResult (iteration complete)
"""
message = event.message
content = message.get("content", [])
if not isinstance(content, list):
return
# Count when model requests tools
if message.get("role") == "assistant":
cur_tool_call_count = 0
for c in content:
if c.get("toolUse"):
cur_tool_call_count += 1
if cur_tool_call_count > 0:
self.tool_iter_count += 1
self.tool_call_count += cur_tool_call_count
self._parallel_call_count = 0 # Reset parallel call counter for new model response
logger.debug(
f"Iteration {self.tool_iter_count} stToolLimiter._on_before_tool_call method · python · L151-L164 (14 LOC)src/strands_sglang/tool_limiter.py
def _on_before_tool_call(self, event: BeforeToolCallEvent) -> None:
"""Cancel excess tool calls when parallel call limit is reached."""
if self.max_parallel_tool_calls is None:
return
self._parallel_call_count += 1
if self._parallel_call_count > self.max_parallel_tool_calls:
self.cancelled_tool_call_count += 1
event.cancel_tool = (
f"Max parallel tool calls ({self.max_parallel_tool_calls}) reached. This tool call was not executed."
)
logger.debug(
f"Cancelled tool call (parallel count {self._parallel_call_count}, limit {self.max_parallel_tool_calls})"
)ToolParseResult.from_parse_error method · python · L47-L55 (9 LOC)src/strands_sglang/tool_parsers/base.py
def from_parse_error(cls, id: str, raw: str, name: str | None = None) -> ToolParseResult:
"""Create an error result for parse failures.
Args:
id: Tool call ID.
raw: The unparseable raw content (fed back to model for self-correction).
name: Best-effort extracted tool name (defaults to UNKNOWN_NAME).
"""
return cls(id=id, name=name if name is not None else cls.UNKNOWN_NAME, input={}, raw=raw)ToolParser.__init__ method · python · L93-L122 (30 LOC)src/strands_sglang/tool_parsers/base.py
def __init__(
self,
tool_start_token: str = DEFAULT_TOOL_START_TOKEN,
tool_end_token: str = DEFAULT_TOOL_END_TOKEN,
think_start_token: str = DEFAULT_THINK_START_TOKEN,
think_end_token: str = DEFAULT_THINK_END_TOKEN,
) -> None:
"""Initialize the parser with optional custom tokens.
Args:
tool_start_token: Opening token for tool calls.
tool_end_token: Closing token for tool calls.
think_start_token: Opening token for think blocks.
think_end_token: Closing token for think blocks.
"""
self.tool_start_token = tool_start_token
self.tool_end_token = tool_end_token
self.think_start_token = think_start_token
self.think_end_token = think_end_token
# Pattern to extract tool call content (with whitespace trimming)
self.tool_pattern = re.compile(
rf"{re.escape(tool_start_token)}\s*(.*?)\s*{re.escape(tool_end_token)}",
Repobility · open methodology · https://repobility.com/research/
ToolParser.parse method · python · L137-L146 (10 LOC)src/strands_sglang/tool_parsers/base.py
def parse(self, text: str) -> list[ToolParseResult]:
"""Parse tool calls from model output text.
Args:
text: Model output text.
Returns:
List of parsed tool call results.
"""
...register_tool_parser function · python · L149-L168 (20 LOC)src/strands_sglang/tool_parsers/base.py
def register_tool_parser(name: str) -> Callable[[type[T]], type[T]]:
"""Decorator to register a tool parser class.
Args:
name: Registry name for the parser.
Returns:
Decorator that registers the class and returns it unchanged.
Example:
>>> @register_tool_parser("my_parser")
... class MyParser(ToolParser):
... def parse(self, text): ...
"""
def decorator(cls: type[T]) -> type[T]:
TOOL_PARSER_REGISTRY[name] = cls
return cls
return decoratorget_tool_parser function · python · L171-L191 (21 LOC)src/strands_sglang/tool_parsers/base.py
def get_tool_parser(name: str, **kwargs: Any) -> ToolParser:
"""Get a tool parser by name.
Args:
name: Parser name (e.g., "hermes", "qwen_xml").
**kwargs: Arguments passed to the parser constructor.
Returns:
Instantiated parser.
Raises:
KeyError: If parser name is not registered.
Example:
>>> parser = get_tool_parser("hermes")
>>> parser = get_tool_parser("hermes", think_start_token="<reasoning>")
"""
if name not in TOOL_PARSER_REGISTRY:
available = ", ".join(sorted(TOOL_PARSER_REGISTRY.keys()))
raise KeyError(f"Unknown tool parser: {name!r}. Available: {available}")
return TOOL_PARSER_REGISTRY[name](**kwargs)GLMToolParser.parse method · python · L76-L121 (46 LOC)src/strands_sglang/tool_parsers/glm.py
def parse(self, text: str) -> list[ToolParseResult]:
"""Parse tool calls from GLM model output.
Extracts the function name from the first line after ``<tool_call>``,
then parses ``<arg_key>``/``<arg_value>`` pairs into a dict.
Args:
text: Model output text.
Returns:
List of tool call results (successful and errors).
"""
# Remove think blocks to avoid parsing draft tool calls from reasoning
text = self.think_pattern.sub("", text)
tool_calls: list[ToolParseResult] = []
for i, match in enumerate(self.tool_pattern.finditer(text)):
raw_content = match.group(1).strip()
tool_call_id = f"call_{i:04d}" # Sequential IDs for sortability
# Function name is on the first line
lines = raw_content.split("\n", 1)
name = lines[0].strip()
# Check if name is missing or contains XML tags (indicating we picked up arg tags HermesToolParser.parse method · python · L63-L116 (54 LOC)src/strands_sglang/tool_parsers/hermes.py
def parse(self, text: str) -> list[ToolParseResult]:
"""Parse tool calls from model output.
Args:
text: Model output text.
Returns:
List of tool call results (successful and errors).
"""
# Remove think blocks to avoid parsing draft tool calls from reasoning
text = self.think_pattern.sub("", text)
tool_calls: list[ToolParseResult] = []
for i, match in enumerate(self.tool_pattern.finditer(text)):
raw_content = match.group(1).strip()
tool_call_id = f"call_{i:04d}" # Sequential IDs for sortability
# Only handle JSONDecodeError - let Strands validate the rest
try:
call_json = json.loads(raw_content)
except json.JSONDecodeError as e:
name_match = self._NAME_PATTERN.search(raw_content)
name = name_match.group(1) if name_match else ToolParseResult.UNKNOWN_NAME
logger.warning(QwenXMLToolParser.parse method · python · L71-L120 (50 LOC)src/strands_sglang/tool_parsers/qwen_xml.py
def parse(self, text: str) -> list[ToolParseResult]:
"""Parse tool calls from model output.
Args:
text: Model output text.
Returns:
List of tool call results (successful and errors).
"""
# Remove think blocks to avoid parsing draft tool calls from reasoning
text = self.think_pattern.sub("", text)
tool_calls: list[ToolParseResult] = []
for i, match in enumerate(self.tool_pattern.finditer(text)):
raw_content = match.group(1).strip()
tool_call_id = f"call_{i:04d}" # Sequential IDs for sortability
# Parse the function tag
func_match = self._FUNCTION_PATTERN.search(raw_content)
if not func_match:
logger.warning("Tool parse error: missing <function=...> tag")
tool_calls.append(ToolParseResult.from_parse_error(id=tool_call_id, raw=raw_content))
continue
func_name = func_match.gget_client function · python · L29-L46 (18 LOC)src/strands_sglang/utils.py
def get_client(
base_url: str,
*,
max_connections: int = DEFAULT_MAX_CONNECTIONS,
timeout: float | None = 900.0,
connect_timeout: float = 5.0,
max_retries: int = 60,
retry_delay: float = 1.0,
) -> SGLangClient:
"""Get a shared (cached) `SGLangClient` for connection pooling."""
return SGLangClient(
base_url=base_url,
max_connections=max_connections,
timeout=timeout,
connect_timeout=connect_timeout,
max_retries=max_retries,
retry_delay=retry_delay,
)get_client_from_slime_args function · python · L49-L70 (22 LOC)src/strands_sglang/utils.py
def get_client_from_slime_args(
args: Any,
*,
timeout: float | None = 900.0,
connect_timeout: float = 5.0,
max_retries: int = 60,
retry_delay: float = 1.0,
) -> SGLangClient:
"""Get a shared (cached) `SGLangClient` from `slime`'s training args.
Matches slime's :func:`init_http_client` formula for connection pooling.
"""
base_url = f"http://{args.sglang_router_ip}:{args.sglang_router_port}"
max_connections = int(args.sglang_server_concurrency * args.rollout_num_gpus // args.rollout_num_gpus_per_engine)
return get_client(
base_url=base_url,
max_connections=max_connections,
timeout=timeout,
connect_timeout=connect_timeout,
max_retries=max_retries,
retry_delay=retry_delay,
)If a scraper extracted this row, it came from Repobility (https://repobility.com)
get_tokenizer function · python · L74-L85 (12 LOC)src/strands_sglang/utils.py
def get_tokenizer(tokenizer_path: str) -> PreTrainedTokenizer:
"""Get a shared (cached) tokenizer.
Args:
tokenizer_path: Path or HuggingFace model ID for the tokenizer.
Returns:
Cached tokenizer instance.
"""
from transformers import AutoTokenizer
return AutoTokenizer.from_pretrained(tokenizer_path, trust_remote_code=True)get_processor function · python · L89-L100 (12 LOC)src/strands_sglang/utils.py
def get_processor(processor_path: str) -> ProcessorMixin:
"""Get a shared (cached) multimodal processor.
Args:
processor_path: Path or HuggingFace model ID.
Returns:
Cached processor instance.
"""
from transformers import AutoProcessor
return AutoProcessor.from_pretrained(processor_path, trust_remote_code=True)