Function bodies 178 total
query_by_timerange method · python · L490-L515 (26 LOC)harness/traces/store.py
async def query_by_timerange(
self, since: str, until: str | None = None
) -> list[dict]:
"""Fetch trace records within a timestamp range.
Args:
since: ISO8601 string — lower bound (inclusive).
until: ISO8601 string — upper bound (inclusive), or None for open-ended.
Returns:
List of trace record dicts ordered by timestamp ASC.
"""
async with aiosqlite.connect(self._db_path) as db:
db.row_factory = aiosqlite.Row
if until is None:
query = "SELECT * FROM traces WHERE timestamp >= ? ORDER BY timestamp ASC"
params: tuple = (since,)
else:
query = (
"SELECT * FROM traces WHERE timestamp >= ? AND timestamp <= ? "
"ORDER BY timestamp ASC"
)
params = (since, until)
async with db.execute(query, params) as cursor:
rows = awaload_config function · python · L21-L23 (3 LOC)scripts/_litellm_register.py
def load_config():
with open(CONFIG_PATH) as f:
return yaml.safe_load(f) or {}save_config function · python · L26-L32 (7 LOC)scripts/_litellm_register.py
def save_config(config):
# Backup before destructive write
backup = CONFIG_PATH + f".bak.{datetime.now().strftime('%Y%m%d%H%M%S')}"
shutil.copy2(CONFIG_PATH, backup)
with open(CONFIG_PATH, "w") as f:
yaml.dump(config, f, default_flow_style=False, allow_unicode=True, sort_keys=False)
print(f"Backup saved: {backup}")add_model function · python · L35-L51 (17 LOC)scripts/_litellm_register.py
def add_model(model_name, api_base):
config = load_config()
model_list = config.setdefault("model_list", [])
existing = [m["model_name"] for m in model_list if "model_name" in m]
if model_name in existing:
print(f"Already registered: {model_name}")
return
model_list.append({
"model_name": model_name,
"litellm_params": {
"model": f"openai/{model_name}",
"api_base": api_base,
"api_key": "none"
}
})
save_config(config)
print(f"Registered: {model_name}")remove_model function · python · L54-L63 (10 LOC)scripts/_litellm_register.py
def remove_model(model_name):
config = load_config()
model_list = config.get("model_list", [])
original_count = len(model_list)
config["model_list"] = [m for m in model_list if m.get("model_name") != model_name]
if len(config["model_list"]) == original_count:
print(f"Not found: {model_name}")
sys.exit(1)
save_config(config)
print(f"Removed: {model_name}")list_models function · python · L66-L71 (6 LOC)scripts/_litellm_register.py
def list_models():
config = load_config()
for m in config.get("model_list", []):
name = m.get("model_name", "?")
base = m.get("litellm_params", {}).get("api_base", "?")
print(f" {name} -> {base}")AnchorStore class · python · L35-L170 (136 LOC)telemetry/telemetry/anchor_store.py
class AnchorStore:
"""Persistent store for proven training configuration anchors.
Args:
store_path: Path to the JSON store file. Created on first write.
"""
def __init__(self, store_path: Path) -> None:
self._store_path = Path(store_path)
self._records: dict = self._load()
def compute_config_hash(self, config: dict) -> str:
"""Compute SHA-256 hash of the 9 locked HASH_FIELDS in the config.
Fields are concatenated in HASH_FIELDS order with "|" separator.
Missing fields are represented as empty strings.
Args:
config: Training configuration dict.
Returns:
SHA-256 hex digest string.
"""
values = [str(config.get(field, "")) for field in HASH_FIELDS]
raw = "|".join(values)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def lookup(self, config_hash: str) -> dict | None:
"""Return record by config_hash if exists and not expired; Repobility (the analyzer behind this table) · https://repobility.com
__init__ method · python · L42-L44 (3 LOC)telemetry/telemetry/anchor_store.py
def __init__(self, store_path: Path) -> None:
self._store_path = Path(store_path)
self._records: dict = self._load()compute_config_hash method · python · L46-L60 (15 LOC)telemetry/telemetry/anchor_store.py
def compute_config_hash(self, config: dict) -> str:
"""Compute SHA-256 hash of the 9 locked HASH_FIELDS in the config.
Fields are concatenated in HASH_FIELDS order with "|" separator.
Missing fields are represented as empty strings.
Args:
config: Training configuration dict.
Returns:
SHA-256 hex digest string.
"""
values = [str(config.get(field, "")) for field in HASH_FIELDS]
raw = "|".join(values)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()lookup method · python · L62-L80 (19 LOC)telemetry/telemetry/anchor_store.py
def lookup(self, config_hash: str) -> dict | None:
"""Return record by config_hash if exists and not expired; None otherwise.
Expired records are purged from the in-memory store on access but NOT
immediately persisted (lazy purge to avoid unnecessary disk writes).
Args:
config_hash: SHA-256 hex digest identifying the config.
Returns:
Record dict or None if not found or expired.
"""
record = self._records.get(config_hash)
if record is None:
return None
if self._is_expired(record):
del self._records[config_hash]
return None
return recordapply_override method · python · L82-L125 (44 LOC)telemetry/telemetry/anchor_store.py
def apply_override(
self,
config_hash: str,
status: str,
batch_size: int,
tier_cap: int,
step_size: int = 2,
) -> dict:
"""Create or replace the anchor record for a config hash.
Single-record-per-hash: writing to an existing config_hash REPLACES
the previous record (newest write wins — not accumulation).
Override rules (TELEM-10):
COMPLETED → batch_cap = max(tier_cap, batch_size + step_size)
OOM → batch_cap = batch_size - step_size
WATCHDOG → batch_cap = batch_size - step_size
HANG → NO batch_cap key (TELEM-14)
Args:
config_hash: SHA-256 hex digest identifying the config.
status: One of COMPLETED, OOM, WATCHDOG, HANG.
batch_size: Current batch size observed.
tier_cap: Tier-level maximum batch cap.
step_size: Step increment/decrement for cap calculation.
Returns:_load method · python · L127-L140 (14 LOC)telemetry/telemetry/anchor_store.py
def _load(self) -> dict:
"""Load records from JSON file. Returns empty dict on any failure."""
try:
content = self._store_path.read_text(encoding="utf-8")
return json.loads(content)
except FileNotFoundError:
return {}
except json.JSONDecodeError as exc:
logger.warning(
"AnchorStore: corrupted JSON at %s (%s) — starting with empty store.",
self._store_path,
exc,
)
return {}_save method · python · L142-L155 (14 LOC)telemetry/telemetry/anchor_store.py
def _save(self) -> None:
"""Atomic write using write-to-temp-then-rename pattern.
Writes to a temporary file in the same directory, then renames
to the final path. Ensures the store file is never partially written
if the process is interrupted.
"""
self._store_path.parent.mkdir(parents=True, exist_ok=True)
tmp_path = self._store_path.with_suffix(".tmp")
tmp_path.write_text(
json.dumps(self._records, indent=2, default=str),
encoding="utf-8",
)
os.replace(tmp_path, self._store_path)_is_expired method · python · L157-L170 (14 LOC)telemetry/telemetry/anchor_store.py
def _is_expired(self, record: dict) -> bool:
"""Return True if the record's created_at is older than EXPIRY_DAYS from now (UTC)."""
created_at_str = record.get("created_at")
if not created_at_str:
return False
try:
created_at = datetime.fromisoformat(created_at_str)
# Ensure timezone-aware comparison
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
return (now - created_at) > timedelta(days=EXPIRY_DAYS)
except (ValueError, TypeError):
return False_lora_multiplier function · python · L28-L36 (9 LOC)telemetry/telemetry/effective_scale.py
def _lora_multiplier(lora_rank: int) -> float:
"""LoRA rank 0 means full fine-tune (multiplier 1.0).
Higher rank = more trainable params but still << full fine-tune.
"""
if lora_rank == 0:
return 1.0
# Approximate: rank/1024 fraction of params are trainable
return max(0.05, lora_rank / 1024.0)Repobility — the code-quality scanner for AI-generated software · https://repobility.com
_seq_len_multiplier function · python · L39-L41 (3 LOC)telemetry/telemetry/effective_scale.py
def _seq_len_multiplier(seq_len: int) -> float:
"""Sequence length scaling: normalized to 2048 base."""
return max(0.5, seq_len / 2048.0)compute function · python · L63-L103 (41 LOC)telemetry/telemetry/effective_scale.py
def compute(
raw_params: float,
quant_mode: str = "fp16",
training_framework: str = "pytorch",
gradient_checkpointing_mode: str = "none",
lora_rank: int = 0,
seq_len: int = 2048,
optimizer: str = "adamw",
model_weight_gb: float = 0.0,
) -> dict:
"""Compute effective parameter scale and assign tier.
Returns dict with:
effective_params: float — scaled parameter count
tier: {"batch_cap": int, "min_headroom_pct": int}
"""
quant_mult = QUANT_MULTIPLIERS.get(quant_mode, 2.0)
grad_mult = GRAD_CKPT_MULTIPLIERS.get(gradient_checkpointing_mode, 1.0)
lora_mult = _lora_multiplier(lora_rank)
seq_mult = _seq_len_multiplier(seq_len)
opt_mult = OPTIMIZER_MULTIPLIERS.get(optimizer, 1.0)
effective = raw_params * quant_mult * grad_mult * lora_mult * seq_mult * opt_mult
# Find tier based on raw_params (raw model size determines hardware requirements).
# effective_params captures memory footprint for headroom calclassify_failure function · python · L10-L65 (56 LOC)telemetry/telemetry/failure_classifier.py
def classify_failure(
final_readings: dict,
exit_code: int,
training_completed: bool,
) -> dict:
"""Classify a training run outcome from its final telemetry snapshot.
Args:
final_readings: Dict with keys mem_available_gb, gpu_util_pct,
cpu_pct, temperature_c, duration_at_state_s.
exit_code: Process exit code (0 = normal).
training_completed: Whether the training loop finished normally.
Returns:
Dict with "classification" (str) and "evidence" (dict).
HANG returns never contain a "batch_cap" key.
"""
if training_completed and exit_code == 0:
return {"classification": "clean", "evidence": {}}
mem_gb = final_readings.get("mem_available_gb", 99.0)
gpu_util = final_readings.get("gpu_util_pct", 100)
cpu_pct = final_readings.get("cpu_pct", 0)
temp_c = final_readings.get("temperature_c", 0)
duration_s = final_readings.get("duration_at_state_s", 0)
# OOM: GPU idle + near-zero mprepare_probe function · python · L18-L57 (40 LOC)telemetry/telemetry/probe.py
def prepare_probe(
current_config: dict,
proposed_changes: dict,
probe_dir: Path | None = None,
) -> dict:
"""Write rollback and probe configs for a configuration test.
Args:
current_config: The current training config dict.
proposed_changes: Dict of config keys to change for the probe.
probe_dir: Directory for probe files (default: /tmp/telemetry_probe).
Returns:
{"rollback_config_path": Path, "probe_config_path": Path, "results_path": Path}
"""
if probe_dir is None:
probe_dir = Path("/tmp/telemetry_probe")
probe_dir = Path(probe_dir)
probe_dir.mkdir(parents=True, exist_ok=True)
rollback_path = probe_dir / "rollback_config.json"
probe_path = probe_dir / "probe_config.json"
results_path = probe_dir / "probe_results.jsonl"
# Write rollback (original config)
rollback_path.write_text(json.dumps(current_config, indent=2), encoding="utf-8")
# Write probe (merged config: current overrevaluate_probe function · python · L60-L134 (75 LOC)telemetry/telemetry/probe.py
def evaluate_probe(
results_path: Path,
baseline: dict,
tier_headroom_pct: float,
jitter_margin_gb: float = 5.0,
) -> dict:
"""Evaluate probe results and recommend commit or revert.
Reads the results JSONL file, finds the minimum mem_available_gb
(peak memory usage = minimum available), compares against the
safe threshold from calculate_headroom.
Commit requires headroom_gb > 0 (strictly positive). Equal-to-threshold
reverts because zero headroom provides no safety margin.
Args:
results_path: Path to the JSONL file with probe telemetry records.
baseline: Baseline dict with "mem_available_gb" key.
tier_headroom_pct: Percentage of baseline memory to reserve as threshold.
jitter_margin_gb: Additional GB margin for jitter (default: 5.0).
Returns:
{"action": "commit"|"revert", "reason": str, "anchor_record": dict|None}
"""
results_path = Path(results_path)
lines = results_path.read_text(eGPUSampler class · python · L25-L151 (127 LOC)telemetry/telemetry/sampler.py
class GPUSampler:
"""Sample GPU and memory telemetry from a DGX Spark node.
Attributes:
_mock: True when NVML library is absent or unavailable.
_handle: NVML device handle (None in mock mode).
"""
def __init__(self) -> None:
"""Initialize the sampler.
Tries to initialize NVML and get a device handle. Sets mock mode
if any NVMLError is raised (covers libnvidia-ml.so.1 absent,
no GPU present, and permission errors).
"""
self._mock: bool = False
self._handle = None
try:
pynvml.nvmlInit()
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
except pynvml.NVMLError:
self._mock = True
@property
def mock(self) -> bool:
"""Return True if running in mock mode (no GPU hardware available)."""
return self._mock
def sample(self) -> dict:
"""Sample current GPU and memory telemetry.
Returns:
Dict with k__init__ method · python · L33-L46 (14 LOC)telemetry/telemetry/sampler.py
def __init__(self) -> None:
"""Initialize the sampler.
Tries to initialize NVML and get a device handle. Sets mock mode
if any NVMLError is raised (covers libnvidia-ml.so.1 absent,
no GPU present, and permission errors).
"""
self._mock: bool = False
self._handle = None
try:
pynvml.nvmlInit()
self._handle = pynvml.nvmlDeviceGetHandleByIndex(0)
except pynvml.NVMLError:
self._mock = Truesample method · python · L53-L113 (61 LOC)telemetry/telemetry/sampler.py
def sample(self) -> dict:
"""Sample current GPU and memory telemetry.
Returns:
Dict with keys:
watts (float): GPU power draw in Watts.
temperature_c (int): GPU temperature in Celsius.
gpu_util_pct (int): GPU utilization percentage (0-100).
mem_available_gb (float): Available memory from /proc/meminfo MemAvailable.
page_cache_gb (float): Page cache size from /proc/meminfo Cached.
mock (bool): True if no GPU hardware was available.
Note:
Memory is ALWAYS read from /proc/meminfo, never from
nvmlDeviceGetMemoryInfo (GB10 UMA architecture — TELEM-02).
No subprocess calls are made at any point.
"""
if self._mock:
# Memory is always read from /proc/meminfo (UMA architecture).
# GPU NVML metrics (watts, temp, util) fall back to 0 when
# libnvidia-ml.so.1 is absent, but /pPowered by Repobility — scan your code at https://repobility.com
_read_meminfo method · python · L115-L139 (25 LOC)telemetry/telemetry/sampler.py
def _read_meminfo(self, key: str) -> float:
"""Read a value from /proc/meminfo and return it in gigabytes.
Args:
key: The meminfo field name without the colon (e.g., "MemAvailable").
Returns:
Value in GB (converted from kB). Returns 0.0 if key not found.
"""
try:
content = _MEMINFO_PATH.read_text()
except OSError:
return 0.0
for line in content.splitlines():
if line.startswith(f"{key}:"):
# Format: "MemAvailable: 83886080 kB"
parts = line.split()
if len(parts) >= 2:
try:
kb = int(parts[1])
return kb / (1024 * 1024)
except ValueError:
return 0.0
return 0.0append_jsonl method · python · L141-L151 (11 LOC)telemetry/telemetry/sampler.py
def append_jsonl(self, path: Union[str, Path]) -> None:
"""Sample telemetry and append a JSON record to a NDJSON file.
Args:
path: Path to the NDJSON output file. Created if it does not exist.
Appended to if it already exists.
"""
record = self.sample()
record["ts"] = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
with open(path, "a", encoding="utf-8") as fh:
fh.write(json.dumps(record) + "\n")UMAMemModel class · python · L20-L79 (60 LOC)telemetry/telemetry/uma_model.py
class UMAMemModel:
def __init__(self, sampler: GPUSampler):
self._sampler = sampler
def sample_baseline(self) -> dict:
"""Drop page cache (best-effort) then sample baseline memory state.
Returns dict with keys: mem_available_gb, page_cache_gb,
idle_watts, timestamp.
If drop_caches fails (non-root), logs a warning that the baseline
may include cached pages (dirty baseline). Addresses review concern:
callers should know when baseline is not clean.
"""
# Best-effort cache drop — requires root
try:
_DROP_CACHES_PATH.write_text("3")
except (PermissionError, OSError) as exc:
logger.warning(
"Could not drop page cache (%s) — baseline may include "
"cached pages (dirty baseline). Run as root for a clean "
"baseline.",
exc,
)
snapshot = self._sampler.sample()
return {
"msample_baseline method · python · L24-L51 (28 LOC)telemetry/telemetry/uma_model.py
def sample_baseline(self) -> dict:
"""Drop page cache (best-effort) then sample baseline memory state.
Returns dict with keys: mem_available_gb, page_cache_gb,
idle_watts, timestamp.
If drop_caches fails (non-root), logs a warning that the baseline
may include cached pages (dirty baseline). Addresses review concern:
callers should know when baseline is not clean.
"""
# Best-effort cache drop — requires root
try:
_DROP_CACHES_PATH.write_text("3")
except (PermissionError, OSError) as exc:
logger.warning(
"Could not drop page cache (%s) — baseline may include "
"cached pages (dirty baseline). Run as root for a clean "
"baseline.",
exc,
)
snapshot = self._sampler.sample()
return {
"mem_available_gb": snapshot["mem_available_gb"],
"page_cache_gb": snapshot["page_cachecalculate_headroom method · python · L54-L79 (26 LOC)telemetry/telemetry/uma_model.py
def calculate_headroom(
baseline: dict,
current: dict,
tier_headroom_pct: float,
jitter_margin_gb: float = 5.0,
) -> dict:
"""Calculate safe memory threshold and headroom.
safe_threshold = baseline_mem * (tier_headroom_pct/100) + jitter_margin_gb
headroom_gb = current_mem - safe_threshold
headroom_pct = headroom_gb / current_mem * 100 (0 if current_mem == 0)
UMA semantics: pin_memory always False, prefetch_factor capped at 4.
"""
baseline_mem = baseline.get("mem_available_gb", 0.0)
current_mem = current.get("mem_available_gb", 0.0)
safe_threshold = baseline_mem * (tier_headroom_pct / 100.0) + jitter_margin_gb
headroom_gb = current_mem - safe_threshold
headroom_pct = (headroom_gb / current_mem * 100.0) if current_mem > 0 else 0.0
return {
"safe_threshold": round(safe_threshold, 2),
"headroom_gb": round(headroom_gb, 2),
‹ prevpage 4 / 4