← back to dr-robert-li__dgx-toolbox

Function bodies 178 total

All specs Real LLM only Function bodies
query_by_timerange method · python · L490-L515 (26 LOC)
harness/traces/store.py
    async def query_by_timerange(
        self, since: str, until: str | None = None
    ) -> list[dict]:
        """Fetch trace records within a timestamp range.

        Args:
            since: ISO8601 string — lower bound (inclusive).
            until: ISO8601 string — upper bound (inclusive), or None for open-ended.

        Returns:
            List of trace record dicts ordered by timestamp ASC.
        """
        async with aiosqlite.connect(self._db_path) as db:
            db.row_factory = aiosqlite.Row
            if until is None:
                query = "SELECT * FROM traces WHERE timestamp >= ? ORDER BY timestamp ASC"
                params: tuple = (since,)
            else:
                query = (
                    "SELECT * FROM traces WHERE timestamp >= ? AND timestamp <= ? "
                    "ORDER BY timestamp ASC"
                )
                params = (since, until)
            async with db.execute(query, params) as cursor:
                rows = awa
load_config function · python · L21-L23 (3 LOC)
scripts/_litellm_register.py
def load_config():
    with open(CONFIG_PATH) as f:
        return yaml.safe_load(f) or {}
save_config function · python · L26-L32 (7 LOC)
scripts/_litellm_register.py
def save_config(config):
    # Backup before destructive write
    backup = CONFIG_PATH + f".bak.{datetime.now().strftime('%Y%m%d%H%M%S')}"
    shutil.copy2(CONFIG_PATH, backup)
    with open(CONFIG_PATH, "w") as f:
        yaml.dump(config, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
    print(f"Backup saved: {backup}")
add_model function · python · L35-L51 (17 LOC)
scripts/_litellm_register.py
def add_model(model_name, api_base):
    config = load_config()
    model_list = config.setdefault("model_list", [])
    existing = [m["model_name"] for m in model_list if "model_name" in m]
    if model_name in existing:
        print(f"Already registered: {model_name}")
        return
    model_list.append({
        "model_name": model_name,
        "litellm_params": {
            "model": f"openai/{model_name}",
            "api_base": api_base,
            "api_key": "none"
        }
    })
    save_config(config)
    print(f"Registered: {model_name}")
remove_model function · python · L54-L63 (10 LOC)
scripts/_litellm_register.py
def remove_model(model_name):
    config = load_config()
    model_list = config.get("model_list", [])
    original_count = len(model_list)
    config["model_list"] = [m for m in model_list if m.get("model_name") != model_name]
    if len(config["model_list"]) == original_count:
        print(f"Not found: {model_name}")
        sys.exit(1)
    save_config(config)
    print(f"Removed: {model_name}")
list_models function · python · L66-L71 (6 LOC)
scripts/_litellm_register.py
def list_models():
    config = load_config()
    for m in config.get("model_list", []):
        name = m.get("model_name", "?")
        base = m.get("litellm_params", {}).get("api_base", "?")
        print(f"  {name}  ->  {base}")
AnchorStore class · python · L35-L170 (136 LOC)
telemetry/telemetry/anchor_store.py
class AnchorStore:
    """Persistent store for proven training configuration anchors.

    Args:
        store_path: Path to the JSON store file. Created on first write.
    """

    def __init__(self, store_path: Path) -> None:
        self._store_path = Path(store_path)
        self._records: dict = self._load()

    def compute_config_hash(self, config: dict) -> str:
        """Compute SHA-256 hash of the 9 locked HASH_FIELDS in the config.

        Fields are concatenated in HASH_FIELDS order with "|" separator.
        Missing fields are represented as empty strings.

        Args:
            config: Training configuration dict.

        Returns:
            SHA-256 hex digest string.
        """
        values = [str(config.get(field, "")) for field in HASH_FIELDS]
        raw = "|".join(values)
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def lookup(self, config_hash: str) -> dict | None:
        """Return record by config_hash if exists and not expired; 
Repobility (the analyzer behind this table) · https://repobility.com
__init__ method · python · L42-L44 (3 LOC)
telemetry/telemetry/anchor_store.py
    def __init__(self, store_path: Path) -> None:
        self._store_path = Path(store_path)
        self._records: dict = self._load()
compute_config_hash method · python · L46-L60 (15 LOC)
telemetry/telemetry/anchor_store.py
    def compute_config_hash(self, config: dict) -> str:
        """Compute SHA-256 hash of the 9 locked HASH_FIELDS in the config.

        Fields are concatenated in HASH_FIELDS order with "|" separator.
        Missing fields are represented as empty strings.

        Args:
            config: Training configuration dict.

        Returns:
            SHA-256 hex digest string.
        """
        values = [str(config.get(field, "")) for field in HASH_FIELDS]
        raw = "|".join(values)
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()
lookup method · python · L62-L80 (19 LOC)
telemetry/telemetry/anchor_store.py
    def lookup(self, config_hash: str) -> dict | None:
        """Return record by config_hash if exists and not expired; None otherwise.

        Expired records are purged from the in-memory store on access but NOT
        immediately persisted (lazy purge to avoid unnecessary disk writes).

        Args:
            config_hash: SHA-256 hex digest identifying the config.

        Returns:
            Record dict or None if not found or expired.
        """
        record = self._records.get(config_hash)
        if record is None:
            return None
        if self._is_expired(record):
            del self._records[config_hash]
            return None
        return record
apply_override method · python · L82-L125 (44 LOC)
telemetry/telemetry/anchor_store.py
    def apply_override(
        self,
        config_hash: str,
        status: str,
        batch_size: int,
        tier_cap: int,
        step_size: int = 2,
    ) -> dict:
        """Create or replace the anchor record for a config hash.

        Single-record-per-hash: writing to an existing config_hash REPLACES
        the previous record (newest write wins — not accumulation).

        Override rules (TELEM-10):
            COMPLETED → batch_cap = max(tier_cap, batch_size + step_size)
            OOM       → batch_cap = batch_size - step_size
            WATCHDOG  → batch_cap = batch_size - step_size
            HANG      → NO batch_cap key (TELEM-14)

        Args:
            config_hash: SHA-256 hex digest identifying the config.
            status: One of COMPLETED, OOM, WATCHDOG, HANG.
            batch_size: Current batch size observed.
            tier_cap: Tier-level maximum batch cap.
            step_size: Step increment/decrement for cap calculation.

        Returns:
_load method · python · L127-L140 (14 LOC)
telemetry/telemetry/anchor_store.py
    def _load(self) -> dict:
        """Load records from JSON file. Returns empty dict on any failure."""
        try:
            content = self._store_path.read_text(encoding="utf-8")
            return json.loads(content)
        except FileNotFoundError:
            return {}
        except json.JSONDecodeError as exc:
            logger.warning(
                "AnchorStore: corrupted JSON at %s (%s) — starting with empty store.",
                self._store_path,
                exc,
            )
            return {}
_save method · python · L142-L155 (14 LOC)
telemetry/telemetry/anchor_store.py
    def _save(self) -> None:
        """Atomic write using write-to-temp-then-rename pattern.

        Writes to a temporary file in the same directory, then renames
        to the final path. Ensures the store file is never partially written
        if the process is interrupted.
        """
        self._store_path.parent.mkdir(parents=True, exist_ok=True)
        tmp_path = self._store_path.with_suffix(".tmp")
        tmp_path.write_text(
            json.dumps(self._records, indent=2, default=str),
            encoding="utf-8",
        )
        os.replace(tmp_path, self._store_path)
_is_expired method · python · L157-L170 (14 LOC)
telemetry/telemetry/anchor_store.py
    def _is_expired(self, record: dict) -> bool:
        """Return True if the record's created_at is older than EXPIRY_DAYS from now (UTC)."""
        created_at_str = record.get("created_at")
        if not created_at_str:
            return False
        try:
            created_at = datetime.fromisoformat(created_at_str)
            # Ensure timezone-aware comparison
            if created_at.tzinfo is None:
                created_at = created_at.replace(tzinfo=timezone.utc)
            now = datetime.now(timezone.utc)
            return (now - created_at) > timedelta(days=EXPIRY_DAYS)
        except (ValueError, TypeError):
            return False
_lora_multiplier function · python · L28-L36 (9 LOC)
telemetry/telemetry/effective_scale.py
def _lora_multiplier(lora_rank: int) -> float:
    """LoRA rank 0 means full fine-tune (multiplier 1.0).

    Higher rank = more trainable params but still << full fine-tune.
    """
    if lora_rank == 0:
        return 1.0
    # Approximate: rank/1024 fraction of params are trainable
    return max(0.05, lora_rank / 1024.0)
Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_seq_len_multiplier function · python · L39-L41 (3 LOC)
telemetry/telemetry/effective_scale.py
def _seq_len_multiplier(seq_len: int) -> float:
    """Sequence length scaling: normalized to 2048 base."""
    return max(0.5, seq_len / 2048.0)
compute function · python · L63-L103 (41 LOC)
telemetry/telemetry/effective_scale.py
def compute(
    raw_params: float,
    quant_mode: str = "fp16",
    training_framework: str = "pytorch",
    gradient_checkpointing_mode: str = "none",
    lora_rank: int = 0,
    seq_len: int = 2048,
    optimizer: str = "adamw",
    model_weight_gb: float = 0.0,
) -> dict:
    """Compute effective parameter scale and assign tier.

    Returns dict with:
        effective_params: float — scaled parameter count
        tier: {"batch_cap": int, "min_headroom_pct": int}
    """
    quant_mult = QUANT_MULTIPLIERS.get(quant_mode, 2.0)
    grad_mult = GRAD_CKPT_MULTIPLIERS.get(gradient_checkpointing_mode, 1.0)
    lora_mult = _lora_multiplier(lora_rank)
    seq_mult = _seq_len_multiplier(seq_len)
    opt_mult = OPTIMIZER_MULTIPLIERS.get(optimizer, 1.0)

    effective = raw_params * quant_mult * grad_mult * lora_mult * seq_mult * opt_mult

    # Find tier based on raw_params (raw model size determines hardware requirements).
    # effective_params captures memory footprint for headroom cal
classify_failure function · python · L10-L65 (56 LOC)
telemetry/telemetry/failure_classifier.py
def classify_failure(
    final_readings: dict,
    exit_code: int,
    training_completed: bool,
) -> dict:
    """Classify a training run outcome from its final telemetry snapshot.

    Args:
        final_readings: Dict with keys mem_available_gb, gpu_util_pct,
            cpu_pct, temperature_c, duration_at_state_s.
        exit_code: Process exit code (0 = normal).
        training_completed: Whether the training loop finished normally.

    Returns:
        Dict with "classification" (str) and "evidence" (dict).
        HANG returns never contain a "batch_cap" key.
    """
    if training_completed and exit_code == 0:
        return {"classification": "clean", "evidence": {}}

    mem_gb = final_readings.get("mem_available_gb", 99.0)
    gpu_util = final_readings.get("gpu_util_pct", 100)
    cpu_pct = final_readings.get("cpu_pct", 0)
    temp_c = final_readings.get("temperature_c", 0)
    duration_s = final_readings.get("duration_at_state_s", 0)

    # OOM: GPU idle + near-zero m
prepare_probe function · python · L18-L57 (40 LOC)
telemetry/telemetry/probe.py
def prepare_probe(
    current_config: dict,
    proposed_changes: dict,
    probe_dir: Path | None = None,
) -> dict:
    """Write rollback and probe configs for a configuration test.

    Args:
        current_config: The current training config dict.
        proposed_changes: Dict of config keys to change for the probe.
        probe_dir: Directory for probe files (default: /tmp/telemetry_probe).

    Returns:
        {"rollback_config_path": Path, "probe_config_path": Path, "results_path": Path}
    """
    if probe_dir is None:
        probe_dir = Path("/tmp/telemetry_probe")
    probe_dir = Path(probe_dir)
    probe_dir.mkdir(parents=True, exist_ok=True)

    rollback_path = probe_dir / "rollback_config.json"
    probe_path = probe_dir / "probe_config.json"
    results_path = probe_dir / "probe_results.jsonl"

    # Write rollback (original config)
    rollback_path.write_text(json.dumps(current_config, indent=2), encoding="utf-8")

    # Write probe (merged config: current overr
evaluate_probe function · python · L60-L134 (75 LOC)
telemetry/telemetry/probe.py
def evaluate_probe(
    results_path: Path,
    baseline: dict,
    tier_headroom_pct: float,
    jitter_margin_gb: float = 5.0,
) -> dict:
    """Evaluate probe results and recommend commit or revert.

    Reads the results JSONL file, finds the minimum mem_available_gb
    (peak memory usage = minimum available), compares against the
    safe threshold from calculate_headroom.

    Commit requires headroom_gb > 0 (strictly positive). Equal-to-threshold
    reverts because zero headroom provides no safety margin.

    Args:
        results_path: Path to the JSONL file with probe telemetry records.
        baseline: Baseline dict with "mem_available_gb" key.
        tier_headroom_pct: Percentage of baseline memory to reserve as threshold.
        jitter_margin_gb: Additional GB margin for jitter (default: 5.0).

    Returns:
        {"action": "commit"|"revert", "reason": str, "anchor_record": dict|None}
    """
    results_path = Path(results_path)
    lines = results_path.read_text(e
GPUSampler class · python · L25-L151 (127 LOC)
telemetry/telemetry/sampler.py
class GPUSampler:
    """Sample GPU and memory telemetry from a DGX Spark node.

    Attributes:
        _mock: True when NVML library is absent or unavailable.
        _handle: NVML device handle (None in mock mode).
    """

    def __init__(self) -> None:
        """Initialize the sampler.

        Tries to initialize NVML and get a device handle. Sets mock mode
        if any NVMLError is raised (covers libnvidia-ml.so.1 absent,
        no GPU present, and permission errors).
        """
        self._mock: bool = False
        self._handle = None
        try:
            pynvml.nvmlInit()
            self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
        except pynvml.NVMLError:
            self._mock = True

    @property
    def mock(self) -> bool:
        """Return True if running in mock mode (no GPU hardware available)."""
        return self._mock

    def sample(self) -> dict:
        """Sample current GPU and memory telemetry.

        Returns:
            Dict with k
__init__ method · python · L33-L46 (14 LOC)
telemetry/telemetry/sampler.py
    def __init__(self) -> None:
        """Initialize the sampler.

        Tries to initialize NVML and get a device handle. Sets mock mode
        if any NVMLError is raised (covers libnvidia-ml.so.1 absent,
        no GPU present, and permission errors).
        """
        self._mock: bool = False
        self._handle = None
        try:
            pynvml.nvmlInit()
            self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
        except pynvml.NVMLError:
            self._mock = True
sample method · python · L53-L113 (61 LOC)
telemetry/telemetry/sampler.py
    def sample(self) -> dict:
        """Sample current GPU and memory telemetry.

        Returns:
            Dict with keys:
                watts (float): GPU power draw in Watts.
                temperature_c (int): GPU temperature in Celsius.
                gpu_util_pct (int): GPU utilization percentage (0-100).
                mem_available_gb (float): Available memory from /proc/meminfo MemAvailable.
                page_cache_gb (float): Page cache size from /proc/meminfo Cached.
                mock (bool): True if no GPU hardware was available.

        Note:
            Memory is ALWAYS read from /proc/meminfo, never from
            nvmlDeviceGetMemoryInfo (GB10 UMA architecture — TELEM-02).
            No subprocess calls are made at any point.
        """
        if self._mock:
            # Memory is always read from /proc/meminfo (UMA architecture).
            # GPU NVML metrics (watts, temp, util) fall back to 0 when
            # libnvidia-ml.so.1 is absent, but /p
Powered by Repobility — scan your code at https://repobility.com
_read_meminfo method · python · L115-L139 (25 LOC)
telemetry/telemetry/sampler.py
    def _read_meminfo(self, key: str) -> float:
        """Read a value from /proc/meminfo and return it in gigabytes.

        Args:
            key: The meminfo field name without the colon (e.g., "MemAvailable").

        Returns:
            Value in GB (converted from kB). Returns 0.0 if key not found.
        """
        try:
            content = _MEMINFO_PATH.read_text()
        except OSError:
            return 0.0

        for line in content.splitlines():
            if line.startswith(f"{key}:"):
                # Format: "MemAvailable:   83886080 kB"
                parts = line.split()
                if len(parts) >= 2:
                    try:
                        kb = int(parts[1])
                        return kb / (1024 * 1024)
                    except ValueError:
                        return 0.0
        return 0.0
append_jsonl method · python · L141-L151 (11 LOC)
telemetry/telemetry/sampler.py
    def append_jsonl(self, path: Union[str, Path]) -> None:
        """Sample telemetry and append a JSON record to a NDJSON file.

        Args:
            path: Path to the NDJSON output file. Created if it does not exist.
                  Appended to if it already exists.
        """
        record = self.sample()
        record["ts"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        with open(path, "a", encoding="utf-8") as fh:
            fh.write(json.dumps(record) + "\n")
UMAMemModel class · python · L20-L79 (60 LOC)
telemetry/telemetry/uma_model.py
class UMAMemModel:
    def __init__(self, sampler: GPUSampler):
        self._sampler = sampler

    def sample_baseline(self) -> dict:
        """Drop page cache (best-effort) then sample baseline memory state.

        Returns dict with keys: mem_available_gb, page_cache_gb,
        idle_watts, timestamp.

        If drop_caches fails (non-root), logs a warning that the baseline
        may include cached pages (dirty baseline). Addresses review concern:
        callers should know when baseline is not clean.
        """
        # Best-effort cache drop — requires root
        try:
            _DROP_CACHES_PATH.write_text("3")
        except (PermissionError, OSError) as exc:
            logger.warning(
                "Could not drop page cache (%s) — baseline may include "
                "cached pages (dirty baseline). Run as root for a clean "
                "baseline.",
                exc,
            )

        snapshot = self._sampler.sample()
        return {
            "m
sample_baseline method · python · L24-L51 (28 LOC)
telemetry/telemetry/uma_model.py
    def sample_baseline(self) -> dict:
        """Drop page cache (best-effort) then sample baseline memory state.

        Returns dict with keys: mem_available_gb, page_cache_gb,
        idle_watts, timestamp.

        If drop_caches fails (non-root), logs a warning that the baseline
        may include cached pages (dirty baseline). Addresses review concern:
        callers should know when baseline is not clean.
        """
        # Best-effort cache drop — requires root
        try:
            _DROP_CACHES_PATH.write_text("3")
        except (PermissionError, OSError) as exc:
            logger.warning(
                "Could not drop page cache (%s) — baseline may include "
                "cached pages (dirty baseline). Run as root for a clean "
                "baseline.",
                exc,
            )

        snapshot = self._sampler.sample()
        return {
            "mem_available_gb": snapshot["mem_available_gb"],
            "page_cache_gb": snapshot["page_cache
calculate_headroom method · python · L54-L79 (26 LOC)
telemetry/telemetry/uma_model.py
    def calculate_headroom(
        baseline: dict,
        current: dict,
        tier_headroom_pct: float,
        jitter_margin_gb: float = 5.0,
    ) -> dict:
        """Calculate safe memory threshold and headroom.

        safe_threshold = baseline_mem * (tier_headroom_pct/100) + jitter_margin_gb
        headroom_gb = current_mem - safe_threshold
        headroom_pct = headroom_gb / current_mem * 100  (0 if current_mem == 0)

        UMA semantics: pin_memory always False, prefetch_factor capped at 4.
        """
        baseline_mem = baseline.get("mem_available_gb", 0.0)
        current_mem = current.get("mem_available_gb", 0.0)
        safe_threshold = baseline_mem * (tier_headroom_pct / 100.0) + jitter_margin_gb
        headroom_gb = current_mem - safe_threshold
        headroom_pct = (headroom_gb / current_mem * 100.0) if current_mem > 0 else 0.0
        return {
            "safe_threshold": round(safe_threshold, 2),
            "headroom_gb": round(headroom_gb, 2),
       
‹ prevpage 4 / 4