← back to invincible-jha__aumai-otel-genai

Function bodies 11 total

All specs Real LLM only Function bodies
instrument_command function · python · L42-L75 (34 LOC)
src/aumai_otel_genai/cli.py
def instrument_command(
    provider: str, exporter_type: str, demo: bool
) -> None:
    """Set up OTel instrumentation for an LLM provider."""
    from opentelemetry.sdk.trace.export import ConsoleSpanExporter
    from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
        InMemorySpanExporter,
    )

    exporter = (
        InMemorySpanExporter() if exporter_type == "memory" else ConsoleSpanExporter()
    )
    instrumentor = GenAIInstrumentor(exporter=exporter)
    instrumentor.instrument(provider)
    click.echo(f"Instrumented provider: {provider}")
    click.echo(f"Exporter             : {exporter_type}")

    if demo:
        attrs = GenAISpanAttributes(
            model="demo-model",
            provider=provider,
            input_tokens=100,
            output_tokens=50,
            cost_usd=0.002,
            temperature=0.7,
            finish_reason="stop",
            latency_ms=0.0,
        )
        with instrumentor.create_span("chat.completions", att
metrics_command function · python · L94-L119 (26 LOC)
src/aumai_otel_genai/cli.py
def metrics_command(usage_path: str | None, output_fmt: str) -> None:
    """Collect and display GenAI metrics."""
    collector = GenAIMetricsCollector()

    if usage_path:
        for line in Path(usage_path).read_text().splitlines():
            line = line.strip()
            if not line:
                continue
            event = json.loads(line)
            collector.record(
                provider=event.get("provider", "unknown"),
                model=event.get("model", "unknown"),
                input_tokens=int(event.get("input_tokens", 0)),
                output_tokens=int(event.get("output_tokens", 0)),
                latency_ms=float(event.get("latency_ms", 0.0)),
                error=bool(event.get("error", False)),
            )

    if output_fmt == "prometheus":
        click.echo(collector.render_prometheus())
    else:
        all_m = {
            k: v.model_dump() for k, v in collector.all_metrics().items()
        }
        click.echo(json.dumps(all_m, ind
GenAIMetricsCollector.record method · python · L43-L61 (19 LOC)
src/aumai_otel_genai/core.py
    def record(
        self,
        provider: str,
        model: str,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        error: bool = False,
    ) -> None:
        """Record metrics for a single GenAI request."""
        key = f"{provider}/{model}"
        if key not in self._metrics:
            self._metrics[key] = GenAIMetrics()
        self._metrics[key].record_request(
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            latency_ms=latency_ms,
            error=error,
        )
GenAIMetricsCollector.render_prometheus method · python · L72-L109 (38 LOC)
src/aumai_otel_genai/core.py
    def render_prometheus(self) -> str:
        """
        Render metrics in Prometheus text format (exposition format v0.0.4).

        Example output::

            # HELP genai_requests_total Total GenAI requests
            # TYPE genai_requests_total counter
            genai_requests_total{provider="openai",model="gpt-4"} 42
        """
        lines: list[str] = []

        def _label(key: str) -> str:
            provider, model = key.split("/", 1)
            return f'provider="{provider}",model="{model}"'

        lines += [
            "# HELP genai_requests_total Total GenAI requests",
            "# TYPE genai_requests_total counter",
        ]
        for key, m in self._metrics.items():
            lines.append(f"genai_requests_total{{{_label(key)}}} {m.request_count}")

        lines += [
            "# HELP genai_tokens_total Total tokens consumed",
            "# TYPE genai_tokens_total counter",
        ]
        for key, m in self._metrics.items():
            line
GenAIInstrumentor.__init__ method · python · L126-L134 (9 LOC)
src/aumai_otel_genai/core.py
    def __init__(
        self,
        exporter: SpanExporter | None = None,
        use_batch: bool = False,
    ) -> None:
        self._provider: TracerProvider | None = None
        self._exporter = exporter or ConsoleSpanExporter()
        self._use_batch = use_batch
        self._instrumented_providers: set[str] = set()
GenAIInstrumentor.instrument method · python · L136-L158 (23 LOC)
src/aumai_otel_genai/core.py
    def instrument(self, provider: str) -> None:
        """
        Set up an OTel TracerProvider for the given LLM *provider*.

        Multiple providers may be registered; each call is idempotent.
        """
        if self._provider is None:
            resource = Resource.create(
                {
                    "service.name": "aumai-otel-genai",
                    "gen_ai.system": provider,
                }
            )
            self._provider = TracerProvider(resource=resource)
            processor = (
                BatchSpanProcessor(self._exporter)
                if self._use_batch
                else SimpleSpanProcessor(self._exporter)
            )
            self._provider.add_span_processor(processor)
            trace.set_tracer_provider(self._provider)

        self._instrumented_providers.add(provider)
GenAIInstrumentor.create_span method · python · L166-L199 (34 LOC)
src/aumai_otel_genai/core.py
    def create_span(
        self,
        operation: str,
        attributes: GenAISpanAttributes,
    ) -> Generator[Span, None, None]:
        """
        Context manager that creates an OTel span with GenAI attributes.

        Usage::

            with instrumentor.create_span("chat.completions", attrs) as span:
                response = call_llm()
                span.set_attribute("gen_ai.usage.output_tokens", response.tokens)
        """
        if self._provider is None:
            self.instrument(attributes.provider)
        assert self._provider is not None

        tracer = self._provider.get_tracer(_LIBRARY_NAME, _LIBRARY_VERSION)
        otel_attrs = attributes.to_otel_dict()

        with tracer.start_as_current_span(
            f"gen_ai.{operation}",
            attributes=otel_attrs,
        ) as span:
            start = time.monotonic()
            try:
                yield span
            except Exception as exc:
                span.set_status(StatusCode.ERROR
Repobility — same analyzer, your code, free for public repos · /scan/
GenAISpanProcessor.on_end method · python · L217-L225 (9 LOC)
src/aumai_otel_genai/core.py
    def on_end(self, span: Any) -> None:
        """Enrich span with GenAI schema version before forwarding."""
        try:
            span.set_attribute(
                "gen_ai.opentelemetry.schema_version", "1.25.0"
            )
        except Exception:
            pass
        self._inner.on_end(span)
GenAISpanAttributes.to_otel_dict method · python · L34-L50 (17 LOC)
src/aumai_otel_genai/models.py
    def to_otel_dict(self) -> dict[str, Any]:
        """Return a flat dict of OTel attribute names -> values."""
        attrs: dict[str, Any] = {
            "gen_ai.system": self.provider,
            "gen_ai.request.model": self.model,
            "gen_ai.usage.input_tokens": self.input_tokens,
            "gen_ai.usage.output_tokens": self.output_tokens,
            "gen_ai.response.finish_reason": self.finish_reason,
            "aumai.genai.cost_usd": self.cost_usd,
            "aumai.genai.latency_ms": self.latency_ms,
        }
        if self.temperature is not None:
            attrs["gen_ai.request.temperature"] = self.temperature
        if self.max_tokens is not None:
            attrs["gen_ai.request.max_tokens"] = self.max_tokens
        attrs.update(self.extra)
        return attrs
GenAIMetrics.record_request method · python · L64-L76 (13 LOC)
src/aumai_otel_genai/models.py
    def record_request(
        self,
        input_tokens: int,
        output_tokens: int,
        latency_ms: float,
        error: bool = False,
    ) -> None:
        """Update counters for one completed request."""
        self.request_count += 1
        self.token_count += input_tokens + output_tokens
        if error:
            self.error_count += 1
        self._record_latency(latency_ms)
GenAIMetrics._record_latency method · python · L78-L89 (12 LOC)
src/aumai_otel_genai/models.py
    def _record_latency(self, latency_ms: float) -> None:
        """Insert into histogram using exponential buckets."""
        buckets = [10.0, 50.0, 100.0, 250.0, 500.0, 1000.0, 2500.0, 5000.0, float("inf")]
        for upper in buckets:
            if latency_ms <= upper:
                # Find existing bucket entry
                for i, (b, _) in enumerate(self.latency_histogram):
                    if b == upper:
                        self.latency_histogram[i] = (b, self.latency_histogram[i][1] + 1)
                        return
                self.latency_histogram.append((upper, 1))
                return