Function bodies 11 total
instrument_command function · python · L42-L75 (34 LOC)src/aumai_otel_genai/cli.py
def instrument_command(
provider: str, exporter_type: str, demo: bool
) -> None:
"""Set up OTel instrumentation for an LLM provider."""
from opentelemetry.sdk.trace.export import ConsoleSpanExporter
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
InMemorySpanExporter,
)
exporter = (
InMemorySpanExporter() if exporter_type == "memory" else ConsoleSpanExporter()
)
instrumentor = GenAIInstrumentor(exporter=exporter)
instrumentor.instrument(provider)
click.echo(f"Instrumented provider: {provider}")
click.echo(f"Exporter : {exporter_type}")
if demo:
attrs = GenAISpanAttributes(
model="demo-model",
provider=provider,
input_tokens=100,
output_tokens=50,
cost_usd=0.002,
temperature=0.7,
finish_reason="stop",
latency_ms=0.0,
)
with instrumentor.create_span("chat.completions", attmetrics_command function · python · L94-L119 (26 LOC)src/aumai_otel_genai/cli.py
def metrics_command(usage_path: str | None, output_fmt: str) -> None:
"""Collect and display GenAI metrics."""
collector = GenAIMetricsCollector()
if usage_path:
for line in Path(usage_path).read_text().splitlines():
line = line.strip()
if not line:
continue
event = json.loads(line)
collector.record(
provider=event.get("provider", "unknown"),
model=event.get("model", "unknown"),
input_tokens=int(event.get("input_tokens", 0)),
output_tokens=int(event.get("output_tokens", 0)),
latency_ms=float(event.get("latency_ms", 0.0)),
error=bool(event.get("error", False)),
)
if output_fmt == "prometheus":
click.echo(collector.render_prometheus())
else:
all_m = {
k: v.model_dump() for k, v in collector.all_metrics().items()
}
click.echo(json.dumps(all_m, indGenAIMetricsCollector.record method · python · L43-L61 (19 LOC)src/aumai_otel_genai/core.py
def record(
self,
provider: str,
model: str,
input_tokens: int,
output_tokens: int,
latency_ms: float,
error: bool = False,
) -> None:
"""Record metrics for a single GenAI request."""
key = f"{provider}/{model}"
if key not in self._metrics:
self._metrics[key] = GenAIMetrics()
self._metrics[key].record_request(
input_tokens=input_tokens,
output_tokens=output_tokens,
latency_ms=latency_ms,
error=error,
)GenAIMetricsCollector.render_prometheus method · python · L72-L109 (38 LOC)src/aumai_otel_genai/core.py
def render_prometheus(self) -> str:
"""
Render metrics in Prometheus text format (exposition format v0.0.4).
Example output::
# HELP genai_requests_total Total GenAI requests
# TYPE genai_requests_total counter
genai_requests_total{provider="openai",model="gpt-4"} 42
"""
lines: list[str] = []
def _label(key: str) -> str:
provider, model = key.split("/", 1)
return f'provider="{provider}",model="{model}"'
lines += [
"# HELP genai_requests_total Total GenAI requests",
"# TYPE genai_requests_total counter",
]
for key, m in self._metrics.items():
lines.append(f"genai_requests_total{{{_label(key)}}} {m.request_count}")
lines += [
"# HELP genai_tokens_total Total tokens consumed",
"# TYPE genai_tokens_total counter",
]
for key, m in self._metrics.items():
lineGenAIInstrumentor.__init__ method · python · L126-L134 (9 LOC)src/aumai_otel_genai/core.py
def __init__(
self,
exporter: SpanExporter | None = None,
use_batch: bool = False,
) -> None:
self._provider: TracerProvider | None = None
self._exporter = exporter or ConsoleSpanExporter()
self._use_batch = use_batch
self._instrumented_providers: set[str] = set()GenAIInstrumentor.instrument method · python · L136-L158 (23 LOC)src/aumai_otel_genai/core.py
def instrument(self, provider: str) -> None:
"""
Set up an OTel TracerProvider for the given LLM *provider*.
Multiple providers may be registered; each call is idempotent.
"""
if self._provider is None:
resource = Resource.create(
{
"service.name": "aumai-otel-genai",
"gen_ai.system": provider,
}
)
self._provider = TracerProvider(resource=resource)
processor = (
BatchSpanProcessor(self._exporter)
if self._use_batch
else SimpleSpanProcessor(self._exporter)
)
self._provider.add_span_processor(processor)
trace.set_tracer_provider(self._provider)
self._instrumented_providers.add(provider)GenAIInstrumentor.create_span method · python · L166-L199 (34 LOC)src/aumai_otel_genai/core.py
def create_span(
self,
operation: str,
attributes: GenAISpanAttributes,
) -> Generator[Span, None, None]:
"""
Context manager that creates an OTel span with GenAI attributes.
Usage::
with instrumentor.create_span("chat.completions", attrs) as span:
response = call_llm()
span.set_attribute("gen_ai.usage.output_tokens", response.tokens)
"""
if self._provider is None:
self.instrument(attributes.provider)
assert self._provider is not None
tracer = self._provider.get_tracer(_LIBRARY_NAME, _LIBRARY_VERSION)
otel_attrs = attributes.to_otel_dict()
with tracer.start_as_current_span(
f"gen_ai.{operation}",
attributes=otel_attrs,
) as span:
start = time.monotonic()
try:
yield span
except Exception as exc:
span.set_status(StatusCode.ERRORRepobility — same analyzer, your code, free for public repos · /scan/
GenAISpanProcessor.on_end method · python · L217-L225 (9 LOC)src/aumai_otel_genai/core.py
def on_end(self, span: Any) -> None:
"""Enrich span with GenAI schema version before forwarding."""
try:
span.set_attribute(
"gen_ai.opentelemetry.schema_version", "1.25.0"
)
except Exception:
pass
self._inner.on_end(span)GenAISpanAttributes.to_otel_dict method · python · L34-L50 (17 LOC)src/aumai_otel_genai/models.py
def to_otel_dict(self) -> dict[str, Any]:
"""Return a flat dict of OTel attribute names -> values."""
attrs: dict[str, Any] = {
"gen_ai.system": self.provider,
"gen_ai.request.model": self.model,
"gen_ai.usage.input_tokens": self.input_tokens,
"gen_ai.usage.output_tokens": self.output_tokens,
"gen_ai.response.finish_reason": self.finish_reason,
"aumai.genai.cost_usd": self.cost_usd,
"aumai.genai.latency_ms": self.latency_ms,
}
if self.temperature is not None:
attrs["gen_ai.request.temperature"] = self.temperature
if self.max_tokens is not None:
attrs["gen_ai.request.max_tokens"] = self.max_tokens
attrs.update(self.extra)
return attrsGenAIMetrics.record_request method · python · L64-L76 (13 LOC)src/aumai_otel_genai/models.py
def record_request(
self,
input_tokens: int,
output_tokens: int,
latency_ms: float,
error: bool = False,
) -> None:
"""Update counters for one completed request."""
self.request_count += 1
self.token_count += input_tokens + output_tokens
if error:
self.error_count += 1
self._record_latency(latency_ms)GenAIMetrics._record_latency method · python · L78-L89 (12 LOC)src/aumai_otel_genai/models.py
def _record_latency(self, latency_ms: float) -> None:
"""Insert into histogram using exponential buckets."""
buckets = [10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, float("inf")]
for upper in buckets:
if latency_ms <= upper:
# Find existing bucket entry
for i, (b, _) in enumerate(self.latency_histogram):
if b == upper:
self.latency_histogram[i] = (b, self.latency_histogram[i][1] + 1)
return
self.latency_histogram.append((upper, 1))
return