Function bodies 101 total
_to_list function · python · L114-L122 (9 LOC)src/dm_isaac_g1/data/convert.py
def _to_list(val: Any) -> List[float]:
"""Convert various types to float list."""
if val is None:
return []
if hasattr(val, "tolist"):
val = val.tolist()
if isinstance(val, (int, float)):
return [float(val)]
return [float(x) for x in val]detect_format function · python · L130-L189 (60 LOC)src/dm_isaac_g1/data/convert.py
def detect_format(info: Dict) -> Dict[str, Any]:
"""Detect dataset format from info.json.
Args:
info: Loaded info.json dictionary.
Returns:
Dictionary with format details (format, hand_type, state_shape, etc.).
"""
features = info.get("features", {})
feature_keys = list(features.keys())
result = {
"format": "unknown",
"hand_type": "gripper",
"has_body": False,
"has_separate_parts": False,
"has_observation_state": False,
"state_shape": None,
"joint_names": [],
}
# Check for observation.state (combined format)
if "observation.state" in features:
result["has_observation_state"] = True
result["state_shape"] = features["observation.state"].get("shape", [])
names = features["observation.state"].get("names", [])
if names and isinstance(names[0], list):
names = names[0]
result["joint_names"] = names
if result["state_sha_process_hospitality_row function · python · L197-L234 (38 LOC)src/dm_isaac_g1/data/convert.py
def _process_hospitality_row(row: Dict, hand_type: str) -> Tuple[List[float], List[float]]:
"""Process a row from hospitality format."""
# Get body state (29 DOF)
body = _to_list(row.get("observation.body", []))
if len(body) < 29:
body = body + [0.0] * (29 - len(body))
elif len(body) > 29:
body = body[:29]
# Get grippers
left_grip = _to_list(row.get("observation.left_gripper", [0.0]))
right_grip = _to_list(row.get("observation.right_gripper", [0.0]))
# Convert to Inspire
left_inspire = gripper_to_inspire(left_grip[0] if left_grip else 0.0)
right_inspire = gripper_to_inspire(right_grip[0] if right_grip else 0.0)
obs_state = body + left_inspire + right_inspire
# Actions
left_arm_action = _to_list(row.get("action.left_arm", []))
right_arm_action = _to_list(row.get("action.right_arm", []))
left_grip_action = _to_list(row.get("action.left_gripper", [0.0]))
right_grip_action = _to_list(row.get("action.rig_process_dex3_row function · python · L237-L271 (35 LOC)src/dm_isaac_g1/data/convert.py
def _process_dex3_row(row: Dict) -> Tuple[List[float], List[float]]:
"""Process a row from Dex3 combined format (28 DOF)."""
obs_state_28 = _to_list(row.get("observation.state", [0.0] * 28))
action_28 = _to_list(row.get("action", [0.0] * 28))
obs_state_28 = obs_state_28[:28] + [0.0] * max(0, 28 - len(obs_state_28))
action_28 = action_28[:28] + [0.0] * max(0, 28 - len(action_28))
# Parse 28 DOF: [left_arm(7), right_arm(7), left_dex3(7), right_dex3(7)]
left_arm = obs_state_28[0:7]
right_arm = obs_state_28[7:14]
left_dex3 = obs_state_28[14:21]
right_dex3 = obs_state_28[21:28]
# Build 53 DOF: legs(12) + waist(3) + arms(14) + hands(24)
body_29 = [0.0] * 15 + left_arm + right_arm
left_inspire = dex3_to_inspire(left_dex3)
right_inspire = dex3_to_inspire(right_dex3)
obs_state = body_29 + left_inspire + right_inspire
# Same for actions
left_arm_action = action_28[0:7]
right_arm_action = action_28[7:14]
left_dex3_a_process_trifinger_row function · python · L274-L311 (38 LOC)src/dm_isaac_g1/data/convert.py
def _process_trifinger_row(row: Dict) -> Tuple[List[float], List[float]]:
"""Process a row from teleop tri-finger format (43 DOF)."""
obs_state_43 = _to_list(row.get("observation.state", [0.0] * 43))
action_43 = _to_list(row.get("action", [0.0] * 43))
obs_state_43 = obs_state_43[:43] + [0.0] * max(0, 43 - len(obs_state_43))
action_43 = action_43[:43] + [0.0] * max(0, 43 - len(action_43))
# Parse 43 DOF
legs = obs_state_43[0:12]
waist = obs_state_43[12:15]
left_arm = obs_state_43[15:22]
left_trifinger = obs_state_43[22:29]
right_arm = obs_state_43[29:36]
right_trifinger = obs_state_43[36:43]
body_29 = legs + waist + left_arm + right_arm
left_inspire = trifinger_to_inspire(left_trifinger)
right_inspire = trifinger_to_inspire(right_trifinger)
obs_state = body_29 + left_inspire + right_inspire
# Same for actions
legs_action = action_43[0:12]
waist_action = action_43[12:15]
left_arm_action = action_43[15:2convert_to_inspire function · python · L319-L414 (96 LOC)src/dm_isaac_g1/data/convert.py
def convert_to_inspire(
input_path: Path,
output_path: Path,
hand_type: Optional[str] = None,
copy_videos: bool = True,
dry_run: bool = False,
) -> bool:
"""Convert dataset to 53 DOF Inspire format.
Args:
input_path: Path to input dataset.
output_path: Path for output dataset.
hand_type: Override detected hand type ("gripper", "dex3", "trifinger").
copy_videos: Whether to copy video files.
dry_run: If True, only analyze without converting.
Returns:
True if conversion successful.
"""
if pq is None:
raise ImportError("pyarrow required: pip install pyarrow")
input_path = Path(input_path)
output_path = Path(output_path)
print(f"\n{'='*60}")
print("Converting to Inspire format (53 DOF)")
print(f"{'='*60}")
print(f"Input: {input_path}")
print(f"Output: {output_path}")
# Load info.json
info_path = input_path / "meta" / "info.json"
if not info_path.exist_process_parquet_file function · python · L417-L468 (52 LOC)src/dm_isaac_g1/data/convert.py
def _process_parquet_file(
input_file: Path,
output_file: Path,
format_info: Dict,
info: Dict,
) -> int:
"""Process a single parquet file."""
table = pq.read_table(input_file)
cols = table.column_names
num_rows = table.num_rows
data = {col: table.column(col).to_pylist() for col in cols}
output_data = {
"observation.state": [],
"action": [],
}
# Passthrough columns
passthrough = ["frame_index", "episode_index", "timestamp", "task", "index", "task_index"]
for col in cols:
if any(p in col.lower() for p in passthrough):
output_data[col] = data[col]
elif "observation.images" in col:
output_data[col] = data[col]
fmt = format_info["format"]
for i in range(num_rows):
row = {col: data[col][i] for col in data}
if fmt == "hospitality":
obs_state, action = _process_hospitality_row(row, format_info["hand_type"])
elif fmt == "dex3_combined":Repobility — same analyzer, your code, free for public repos · /scan/
_generate_metadata function · python · L471-L546 (76 LOC)src/dm_isaac_g1/data/convert.py
def _generate_metadata(
output_path: Path,
info: Dict,
total_frames: int,
num_episodes: int,
format_info: Dict,
):
"""Generate metadata files for converted dataset."""
meta_dir = output_path / "meta"
meta_dir.mkdir(parents=True, exist_ok=True)
# Get video features
video_features = {
k: v for k, v in info.get("features", {}).items()
if "observation.images" in k
}
new_info = {
"codebase_version": "v2.1",
"robot_type": "unitree_g1_inspire",
"total_episodes": num_episodes,
"total_frames": total_frames,
"fps": info.get("fps", 30),
"splits": {"train": f"0:{num_episodes}"},
"data_path": "data/chunk-000/episode_{episode_index:06d}.parquet",
"features": {
"observation.state": {
"dtype": "float32",
"shape": [53],
"description": "G1 body (29 DOF) + Inspire hands (24 DOF)",
},
"action": _copy_support_files function · python · L549-L571 (23 LOC)src/dm_isaac_g1/data/convert.py
def _copy_support_files(input_path: Path, output_path: Path, copy_videos: bool):
"""Copy support files (episodes.jsonl, tasks.jsonl, videos)."""
import shutil
meta_in = input_path / "meta"
meta_out = output_path / "meta"
meta_out.mkdir(parents=True, exist_ok=True)
for filename in ["episodes.jsonl", "tasks.jsonl"]:
src = meta_in / filename
if src.exists():
shutil.copy2(src, meta_out / filename)
print(f" Copied {filename}")
if copy_videos:
video_dir = input_path / "videos"
if video_dir.exists():
output_video = output_path / "videos"
if output_video.exists():
shutil.rmtree(output_video)
print(f" Copying videos...")
shutil.copytree(video_dir, output_video)
print(f" Copied videos")_to_np function · python · L75-L82 (8 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def _to_np(val, expected_len: int) -> np.ndarray:
"""Convert value to numpy array, padding/truncating to expected length."""
if val is None:
return np.zeros(expected_len, dtype=np.float32)
arr = np.asarray(val, dtype=np.float32).flatten()
if len(arr) < expected_len:
arr = np.pad(arr, (0, expected_len - len(arr)))
return arr[:expected_len]process_row function · python · L85-L126 (42 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def process_row(row: dict) -> tuple:
"""Convert a single row from hospitality format to UNITREE_G1 format.
Returns:
(observation_state, action) as numpy arrays.
"""
# --- Build observation.state (31 DOF) ---
body = _to_np(row.get("observation.body"), 29)
left_gripper = _to_np(row.get("observation.left_gripper"), 1)
right_gripper = _to_np(row.get("observation.right_gripper"), 1)
# body layout: [left_leg(6), right_leg(6), waist(3), left_arm(7), right_arm(7)]
obs_state = np.concatenate([
body, # 29 DOF (legs + waist + arms)
left_gripper, # 1 DOF
right_gripper, # 1 DOF
]) # Total: 31
# --- Build action (23 DOF) ---
left_arm_action = _to_np(row.get("action.left_arm"), 7)
right_arm_action = _to_np(row.get("action.right_arm"), 7)
left_grip_action = _to_np(row.get("action.left_gripper"), 1)
right_grip_action = _to_np(row.get("action.right_gripper"), 1)
body_action = _to_np(row.get(convert_parquet_file function · python · L129-L175 (47 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def convert_parquet_file(
input_file: Path,
output_file: Path,
ego_camera: str,
) -> int:
"""Convert a single parquet file."""
table = pq.read_table(input_file)
cols = table.column_names
num_rows = table.num_rows
data = {col: table.column(col).to_pylist() for col in cols}
obs_states = []
actions = []
for i in range(num_rows):
row = {col: data[col][i] for col in data}
obs_state, action = process_row(row)
obs_states.append(obs_state.tolist())
actions.append(action.tolist())
# Build output columns
output_data = {
"observation.state": obs_states,
"action": actions,
}
# Passthrough metadata columns
passthrough_cols = [
"frame_index", "episode_index", "timestamp",
"index", "task_index",
]
for col in passthrough_cols:
if col in data:
output_data[col] = data[col]
# Keep only the ego camera image reference, renamed to ego_view
generate_modality_json function · python · L178-L214 (37 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def generate_modality_json(output_path: Path, ego_camera: str):
"""Generate modality.json for UNITREE_G1 embodiment."""
modality = {
"state": {
name: {
"start": info["start"],
"end": info["end"],
"original_key": "observation.state",
}
for name, info in STATE_LAYOUT.items()
},
"action": {
name: {
"start": info["start"],
"end": info["end"],
"original_key": "action",
}
for name, info in ACTION_LAYOUT.items()
},
"video": {
"ego_view": {
"original_key": "observation.images.ego_view",
},
},
"annotation": {
"human.task_description": {
"original_key": "task_index",
},
},
}
meta_dir = output_path / "meta"
meta_dir.mkdir(parents=True, exist_ok=True)
with open(meta_generate_info_json function · python · L217-L290 (74 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def generate_info_json(
output_path: Path,
source_info: dict,
total_frames: int,
num_episodes: int,
ego_camera: str,
):
"""Generate info.json for converted dataset."""
ego_source_key = f"observation.images.{ego_camera}"
ego_video_key = "observation.images.ego_view"
source_features = source_info.get("features", {})
video_info = source_features.get(ego_source_key, {})
new_info = {
"codebase_version": "v2.1",
"robot_type": "unitree_g1",
"total_episodes": num_episodes,
"total_frames": total_frames,
"total_tasks": source_info.get("total_tasks", 1),
"total_videos": num_episodes,
"total_chunks": source_info.get("total_chunks", 1),
"chunks_size": source_info.get("chunks_size", 1000),
"fps": source_info.get("fps", 30),
"splits": source_info.get("splits", {"train": f"0:{num_episodes}"}),
"data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parquconvert_dataset function · python · L293-L419 (127 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def convert_dataset(
input_path: Path,
output_path: Path,
ego_camera: str = "cam_left_high",
dry_run: bool = False,
) -> bool:
"""Convert hospitality dataset to GR00T UNITREE_G1 format.
Args:
input_path: Path to input hospitality dataset.
output_path: Path for output GR00T-formatted dataset.
ego_camera: Camera to use as ego_view (default: cam_left_high).
dry_run: If True, analyze only.
Returns:
True if successful.
"""
if pq is None:
raise ImportError("pyarrow required: pip install pyarrow")
input_path = Path(input_path)
output_path = Path(output_path)
print(f"\n{'='*60}")
print("Converting to GR00T UNITREE_G1 format")
print(f"{'='*60}")
print(f"Input: {input_path}")
print(f"Output: {output_path}")
print(f"Ego camera: {ego_camera}")
# Load source info
info_path = input_path / "meta" / "info.json"
if not info_path.exists():
print(f"ERROR: {Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
main function · python · L422-L442 (21 LOC)src/dm_isaac_g1/data/convert_to_groot.py
def main():
import argparse
parser = argparse.ArgumentParser(
description="Convert hospitality dataset to GR00T UNITREE_G1 format"
)
parser.add_argument("--input", required=True, help="Input dataset path")
parser.add_argument("--output", required=True, help="Output dataset path")
parser.add_argument(
"--ego-camera", default="cam_left_high",
help="Camera to use as ego_view (default: cam_left_high)"
)
parser.add_argument("--dry-run", action="store_true", help="Analyze only")
args = parser.parse_args()
success = convert_dataset(
Path(args.input), Path(args.output),
ego_camera=args.ego_camera,
dry_run=args.dry_run,
)
return 0 if success else 1download_dataset function · python · L25-L100 (76 LOC)src/dm_isaac_g1/data/download.py
def download_dataset(
repo_id: str,
output_dir: Path,
use_lfs: bool = True,
hf_token: Optional[str] = None,
) -> Path:
"""Download a dataset from HuggingFace.
Args:
repo_id: HuggingFace repository ID (e.g., "unitreerobotics/G1_Fold_Towel").
output_dir: Directory to download to.
use_lfs: Use git LFS for large files (recommended for video datasets).
hf_token: HuggingFace token for private repos.
Returns:
Path to downloaded dataset.
Raises:
RuntimeError: If download fails.
"""
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
# Extract dataset name from repo_id
dataset_name = repo_id.split("/")[-1]
dataset_path = output_dir / dataset_name
if dataset_path.exists():
print(f"Dataset already exists: {dataset_path}")
return dataset_path
if use_lfs:
# Use git clone with LFS (better for large files)
print(f"Cloning {repo_idownload_hospitality_datasets function · python · L103-L135 (33 LOC)src/dm_isaac_g1/data/download.py
def download_hospitality_datasets(
output_dir: Path,
datasets: Optional[List[str]] = None,
use_lfs: bool = True,
hf_token: Optional[str] = None,
) -> List[Path]:
"""Download all hospitality datasets.
Args:
output_dir: Directory to download to.
datasets: Specific datasets to download. Downloads all if None.
use_lfs: Use git LFS.
hf_token: HuggingFace token.
Returns:
List of paths to downloaded datasets.
"""
datasets = datasets or HOSPITALITY_DATASETS
downloaded = []
for repo_id in datasets:
try:
path = download_dataset(
repo_id=repo_id,
output_dir=output_dir,
use_lfs=use_lfs,
hf_token=hf_token,
)
downloaded.append(path)
except Exception as e:
print(f"Failed to download {repo_id}: {e}")
return downloadeddownload_dex3_datasets function · python · L138-L167 (30 LOC)src/dm_isaac_g1/data/download.py
def download_dex3_datasets(
output_dir: Path,
use_lfs: bool = True,
hf_token: Optional[str] = None,
) -> List[Path]:
"""Download Dex3 datasets.
Args:
output_dir: Directory to download to.
use_lfs: Use git LFS.
hf_token: HuggingFace token.
Returns:
List of paths to downloaded datasets.
"""
downloaded = []
for repo_id in DEX3_DATASETS:
try:
path = download_dataset(
repo_id=repo_id,
output_dir=output_dir,
use_lfs=use_lfs,
hf_token=hf_token,
)
downloaded.append(path)
except Exception as e:
print(f"Failed to download {repo_id}: {e}")
return downloadedcompute_stats function · python · L15-L108 (94 LOC)src/dm_isaac_g1/data/stats.py
def compute_stats(
dataset_path: Path,
keys: Optional[List[str]] = None,
output_file: Optional[Path] = None,
) -> Dict[str, Dict[str, List[float]]]:
"""Compute normalization statistics for dataset.
Computes mean, std, min, max for observation.state and action columns.
Args:
dataset_path: Path to dataset.
keys: Specific columns to compute stats for. Defaults to
["observation.state", "action"].
output_file: If provided, saves stats to this file.
Returns:
Dictionary mapping column names to their statistics.
"""
if pq is None:
raise ImportError("pyarrow required: pip install pyarrow")
dataset_path = Path(dataset_path)
keys = keys or ["observation.state", "action"]
# Find all parquet files
parquet_files = sorted((dataset_path / "data").rglob("*.parquet"))
if not parquet_files:
raise ValueError(f"No parquet files found in {dataset_path / 'data'}")
print(f"Computingload_stats function · python · L111-L126 (16 LOC)src/dm_isaac_g1/data/stats.py
def load_stats(dataset_path: Path) -> Dict[str, Dict[str, List[float]]]:
"""Load statistics from stats.json.
Args:
dataset_path: Path to dataset.
Returns:
Statistics dictionary.
"""
stats_file = Path(dataset_path) / "meta" / "stats.json"
if not stats_file.exists():
raise FileNotFoundError(f"Stats file not found: {stats_file}")
with open(stats_file) as f:
return json.load(f)normalize function · python · L129-L155 (27 LOC)src/dm_isaac_g1/data/stats.py
def normalize(
data: np.ndarray,
stats: Dict[str, List[float]],
method: str = "standard",
) -> np.ndarray:
"""Normalize data using computed statistics.
Args:
data: Data array to normalize.
stats: Statistics dictionary with mean, std, min, max.
method: Normalization method ("standard" or "minmax").
Returns:
Normalized data array.
"""
if method == "standard":
mean = np.array(stats["mean"])
std = np.array(stats["std"])
return (data - mean) / std
elif method == "minmax":
min_val = np.array(stats["min"])
max_val = np.array(stats["max"])
return (data - min_val) / (max_val - min_val + 1e-6)
else:
raise ValueError(f"Unknown normalization method: {method}")denormalize function · python · L158-L184 (27 LOC)src/dm_isaac_g1/data/stats.py
def denormalize(
data: np.ndarray,
stats: Dict[str, List[float]],
method: str = "standard",
) -> np.ndarray:
"""Denormalize data using computed statistics.
Args:
data: Normalized data array.
stats: Statistics dictionary.
method: Normalization method used.
Returns:
Denormalized data array.
"""
if method == "standard":
mean = np.array(stats["mean"])
std = np.array(stats["std"])
return data * std + mean
elif method == "minmax":
min_val = np.array(stats["min"])
max_val = np.array(stats["max"])
return data * (max_val - min_val) + min_val
else:
raise ValueError(f"Unknown normalization method: {method}")Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
validate_dataset function · python · L24-L123 (100 LOC)src/dm_isaac_g1/data/validate.py
def validate_dataset(dataset_path: Path) -> ValidationResult:
"""Validate dataset structure and contents.
Checks for:
- Required meta files (info.json, modality.json, episodes.jsonl, tasks.jsonl)
- Parquet file existence and schema
- Video file existence (if referenced)
- Consistency between meta files and data
Args:
dataset_path: Path to dataset directory.
Returns:
ValidationResult with validation status and any issues found.
"""
dataset_path = Path(dataset_path)
errors = []
warnings = []
info = {}
# Check directory exists
if not dataset_path.exists():
return ValidationResult(
valid=False,
errors=[f"Dataset path does not exist: {dataset_path}"],
)
meta_dir = dataset_path / "meta"
data_dir = dataset_path / "data"
# Check meta directory
if not meta_dir.exists():
errors.append("Missing meta/ directory")
else:
# Check required meta ffix_episodes function · python · L126-L219 (94 LOC)src/dm_isaac_g1/data/validate.py
def fix_episodes(
dataset_path: Path,
task_description: Optional[str] = None,
) -> bool:
"""Create or fix episodes.jsonl and tasks.jsonl files.
Args:
dataset_path: Path to dataset.
task_description: Task description to use. Auto-detected if None.
Returns:
True if files were created/fixed successfully.
"""
if pq is None:
raise ImportError("pyarrow required: pip install pyarrow")
dataset_path = Path(dataset_path)
meta_dir = dataset_path / "meta"
meta_dir.mkdir(parents=True, exist_ok=True)
episodes_file = meta_dir / "episodes.jsonl"
tasks_file = meta_dir / "tasks.jsonl"
# Load info.json
info_file = meta_dir / "info.json"
if not info_file.exists():
print(f"ERROR: info.json not found in {meta_dir}")
return False
with open(info_file) as f:
info = json.load(f)
total_episodes = info.get("total_episodes", 0)
# Count parquet files if total_episodes not set
ivalidate_config function · python · L197-L216 (20 LOC)src/dm_isaac_g1/finetuning/configs/g1_inspire_53dof.py
def validate_config():
"""Validate the configuration dimensions."""
total_dof = 0
for key, (start, end) in JOINT_INDEX_RANGES.items():
dof = end - start
total_dof += dof
joint_count = len(G1_INSPIRE_JOINT_NAMES[key])
assert dof == joint_count, f"{key}: index range {dof} != joint count {joint_count}"
assert total_dof == 53, f"Total DOF should be 53, got {total_dof}"
print(f"Configuration validated: {total_dof} DOF")
# Print summary
print("\nJoint Configuration Summary:")
print("-" * 40)
for key, joints in G1_INSPIRE_JOINT_NAMES.items():
start, end = JOINT_INDEX_RANGES[key]
print(f" {key}: {len(joints)} DOF (indices {start}-{end-1})")
print("-" * 40)
print(f" TOTAL: {total_dof} DOF")build_finetune_command function · python · L110-L167 (58 LOC)src/dm_isaac_g1/finetuning/launcher.py
def build_finetune_command(args: FinetuneArgs) -> list[str]:
"""Build the torchrun command to launch fine-tuning.
Args:
args: Fine-tuning arguments
Returns:
Command as list of strings
"""
if not args.datasets:
raise ValueError("At least one dataset path must be provided in args.datasets")
script = "/workspace/Isaac-GR00T/gr00t/experiment/launch_finetune.py"
# Use torchrun for distributed training (matching official approach)
cmd = [
"torchrun",
f"--nproc_per_node={args.num_gpus}",
"--master_port=29500",
script,
"--base_model_path", args.base_model,
"--dataset_path", args.datasets[0],
"--embodiment_tag", args.embodiment_tag,
"--output_dir", args.output,
"--max_steps", str(args.max_steps),
"--save_steps", str(args.save_steps),
"--global_batch_size", str(args.batch_size),
"--learning_rate", str(args.learning_rate),
"--weight_delaunch_finetune function · python · L170-L200 (31 LOC)src/dm_isaac_g1/finetuning/launcher.py
def launch_finetune(args: FinetuneArgs, dry_run: bool = False) -> int:
"""Launch fine-tuning job.
Args:
args: Fine-tuning arguments
dry_run: If True, print command but don't execute
Returns:
Return code from subprocess (0 = success)
"""
cmd = build_finetune_command(args)
print("=" * 60)
print("GROOT Fine-tuning Command:")
print(" ".join(cmd))
print("=" * 60)
if dry_run:
print("(dry run - not executing)")
return 0
# Check if Isaac-GR00T is available
groot_path = Path("/workspace/Isaac-GR00T")
if not groot_path.exists():
print(f"ERROR: Isaac-GR00T not found at {groot_path}")
print("This script must be run on the workstation with Isaac-GR00T installed.")
return 1
# Launch training
result = subprocess.run(cmd, cwd=str(groot_path))
return result.returncodemain function · python · L203-L271 (69 LOC)src/dm_isaac_g1/finetuning/launcher.py
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Launch GROOT fine-tuning via torchrun",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
# Required
parser.add_argument("--base-model", default="nvidia/GR00T-N1.6-3B",
help="Base model path or HuggingFace ID")
parser.add_argument("--datasets", nargs="+", required=True,
help="Dataset path (first dataset used)")
parser.add_argument("--config", default="",
help="Modality config file (empty for pre-registered embodiments)")
parser.add_argument("--output", default="./checkpoints/groot_finetuned",
help="Output directory")
# Training
parser.add_argument("--max-steps", type=int, default=10000)
parser.add_argument("--save-steps", type=int, default=2000)
parser.add_argument("--save-total-limit", type=int, default=2)
parser.add_GrootClient.__init__ method · python · L110-L133 (24 LOC)src/dm_isaac_g1/inference/client.py
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
timeout: float = 30.0,
config: Optional[Config] = None,
):
"""Initialize the GROOT client.
Args:
host: GROOT server host. Defaults to config or 192.168.1.237.
port: GROOT server port. Defaults to config or 5555.
timeout: Request timeout in seconds.
config: Optional Config instance for settings.
"""
if config is None:
from dm_isaac_g1.core.config import load_config
config = load_config()
self.host = host or config.groot_server_host
self.port = port or config.groot_server_port
self.timeout_ms = int(timeout * 1000)
self._context = zmq.Context()
self._socket: Optional[zmq.Socket] = NoneGrootClient._connect method · python · L135-L142 (8 LOC)src/dm_isaac_g1/inference/client.py
def _connect(self):
"""Establish ZeroMQ connection."""
if self._socket is None:
self._socket = self._context.socket(zmq.REQ)
self._socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
self._socket.setsockopt(zmq.SNDTIMEO, self.timeout_ms)
self._socket.setsockopt(zmq.LINGER, 0)
self._socket.connect(f"tcp://{self.host}:{self.port}")Repobility · code-quality intelligence platform · https://repobility.com
GrootClient.health_check method · python · L144-L165 (22 LOC)src/dm_isaac_g1/inference/client.py
def health_check(self) -> bool:
"""Check if the GROOT server is healthy.
Returns:
True if server is responsive, False otherwise.
"""
try:
# Use a separate socket with short timeout for health check
test_socket = self._context.socket(zmq.REQ)
test_socket.setsockopt(zmq.RCVTIMEO, 3000) # 3 second timeout
test_socket.setsockopt(zmq.SNDTIMEO, 3000)
test_socket.setsockopt(zmq.LINGER, 0)
test_socket.connect(f"tcp://{self.host}:{self.port}")
# Send ping request
request = {"endpoint": "ping"}
test_socket.send(MsgSerializer.to_bytes(request))
test_socket.recv() # Wait for any response
test_socket.close()
return True
except zmq.ZMQError:
return FalseGrootClient.get_action method · python · L167-L310 (144 LOC)src/dm_isaac_g1/inference/client.py
def get_action(
self,
observation: np.ndarray,
images: Optional[dict[str, np.ndarray]] = None,
image: Optional[np.ndarray] = None,
task: Optional[str] = None,
action_horizon: int = 16,
execute_steps: int = 1,
return_full_trajectory: bool = False,
) -> Union[np.ndarray, dict]:
"""Get action from the GROOT model.
GR00T 1.6 predicts action chunks (typically 8-16 steps). This method
allows configuring the action horizon at runtime for optimal performance.
Args:
observation: Robot state observation (28 DOF for G1+Dex3).
images: Dict mapping camera names to RGB images (H, W, 3).
Expected keys: cam_left_high, cam_right_high,
cam_left_wrist, cam_right_wrist.
image: Deprecated. Single RGB image sent as cam_left_high only.
Use `images` dict instead for multi-camera models.
task: OptionaGrootClient.get_policy_info method · python · L312-L321 (10 LOC)src/dm_isaac_g1/inference/client.py
def get_policy_info(self) -> dict:
"""Get information about the loaded policy.
Returns:
Dictionary with model info (name, embodiment, DOF, etc.).
"""
self._connect()
request = {"endpoint": "get_policy_info"}
self._socket.send(MsgSerializer.to_bytes(request))
return MsgSerializer.from_bytes(self._socket.recv())GrootClient.close method · python · L323-L328 (6 LOC)src/dm_isaac_g1/inference/client.py
def close(self):
"""Close the ZeroMQ client."""
if self._socket:
self._socket.close()
self._socket = None
self._context.term()GrootClientAsync.__init__ method · python · L351-L379 (29 LOC)src/dm_isaac_g1/inference/client.py
def __init__(
self,
host: Optional[str] = None,
port: Optional[int] = None,
timeout: float = 30.0,
config: Optional[Config] = None,
):
"""Initialize async GROOT client.
Args:
host: GROOT server host.
port: GROOT server port.
timeout: Request timeout in seconds.
config: Optional Config instance.
"""
if config is None:
from dm_isaac_g1.core.config import load_config
config = load_config()
self.host = host or config.groot_server_host
self.port = port or config.groot_server_port
self.timeout_ms = int(timeout * 1000)
# Use sync client internally since zmq async requires zmq.asyncio
self._sync_client = GrootClient(
host=self.host,
port=self.port,
timeout=timeout,
config=config,
)GrootClientAsync.get_action method · python · L387-L421 (35 LOC)src/dm_isaac_g1/inference/client.py
async def get_action(
self,
observation: np.ndarray,
images: Optional[dict[str, np.ndarray]] = None,
image: Optional[np.ndarray] = None,
task: Optional[str] = None,
action_horizon: int = 16,
execute_steps: int = 1,
) -> np.ndarray:
"""Get action from the GROOT model asynchronously.
Args:
observation: Robot state observation (28 DOF for Dex3).
images: Dict mapping camera names to RGB images.
image: Deprecated. Single RGB image. Use `images` instead.
task: Optional task description.
action_horizon: Number of steps to predict (8-16 recommended).
execute_steps: Number of steps to return for execution.
Returns:
Action array. Shape (28,) if execute_steps=1, else (execute_steps, 28).
"""
import asyncio
loop = asyncio.get_event_loop()
return await loop.run_in_executor(
None,
IsaacSimRunner.__init__ method · python · L90-L100 (11 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def __init__(self, config: Optional[Config] = None):
"""Initialize Isaac Sim runner.
Args:
config: Configuration instance.
"""
self.config = config or load_config()
self.workstation = WorkstationConnection(config=self.config)
self.groot_client: Optional[GrootClient] = None
self.current_env: Optional[IsaacEnv] = None
self._session_id: Optional[str] = NoneIsaacSimRunner.setup method · python · L102-L133 (32 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def setup(
self,
env: IsaacEnv = IsaacEnv.PICK_REDBLOCK,
groot_host: Optional[str] = None,
groot_port: Optional[int] = None,
) -> bool:
"""Setup the environment and GROOT client.
Args:
env: Isaac environment to use.
groot_host: GROOT server host.
groot_port: GROOT server port.
Returns:
True if setup successful.
"""
self.current_env = env
# Connect to GROOT server
self.groot_client = GrootClient(
host=groot_host,
port=groot_port,
config=self.config,
)
if not self.groot_client.health_check():
print(f"Warning: GROOT server not responding at {self.groot_client.base_url}")
return False
print(f"Connected to GROOT server at {self.groot_client.base_url}")
print(f"Environment: {env.value}")
return TrueRepobility — same analyzer, your code, free for public repos · /scan/
IsaacSimRunner.run_episode method · python · L135-L197 (63 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def run_episode(
self,
task: str = "Complete the manipulation task",
max_steps: int = 500,
action_horizon: int = 16,
execute_steps: int = 1,
render: bool = False,
record: bool = False,
) -> EpisodeResult:
"""Run a single episode using GROOT for control.
This executes the simulation on the workstation and uses
the GROOT server for action inference.
Args:
task: Task description for language conditioning.
max_steps: Maximum steps per episode.
action_horizon: Number of steps GROOT predicts (8-16 recommended).
execute_steps: Steps to execute before re-planning (1=receding horizon).
render: Whether to render the simulation.
record: Whether to record video.
Returns:
EpisodeResult with success status and metrics.
"""
if self.groot_client is None or self.current_env is None:
raisIsaacSimRunner._parse_episode_result method · python · L413-L434 (22 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def _parse_episode_result(self, stdout: str) -> EpisodeResult:
"""Parse episode result from script output.
Args:
stdout: Script output.
Returns:
EpisodeResult parsed from output.
"""
import json
for line in stdout.split("\n"):
if line.startswith("RESULT:"):
data = json.loads(line[7:])
return EpisodeResult(
success=data.get("success", False),
total_reward=data.get("total_reward", 0.0),
steps=data.get("steps", 0),
metrics=data,
)
return EpisodeResult(success=False, total_reward=0.0, steps=0)IsaacSimRunner.run_benchmark method · python · L436-L477 (42 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def run_benchmark(
self,
env: IsaacEnv,
num_episodes: int = 10,
task: str = "Complete the manipulation task",
) -> Dict[str, Any]:
"""Run multiple episodes and compute statistics.
Args:
env: Environment to benchmark.
num_episodes: Number of episodes to run.
task: Task description.
Returns:
Dictionary with benchmark results.
"""
self.setup(env)
results = []
for i in range(num_episodes):
print(f"Episode {i+1}/{num_episodes}...")
result = self.run_episode(task=task)
results.append(result)
print(f" Success: {result.success}, Reward: {result.total_reward:.2f}")
# Compute statistics
successes = [r.success for r in results]
rewards = [r.total_reward for r in results]
steps = [r.steps for r in results]
return {
"environment": env.value,
IsaacSimRunner.list_available_envs method · python · L479-L485 (7 LOC)src/dm_isaac_g1/inference/isaac_runner.py
def list_available_envs(self) -> List[str]:
"""List available Isaac Sim environments.
Returns:
List of environment names.
"""
return [env.value for env in IsaacEnv]GrootServerManager.__init__ method · python · L42-L51 (10 LOC)src/dm_isaac_g1/inference/server.py
def __init__(self, config: Optional[Config] = None):
"""Initialize server manager.
Args:
config: Configuration instance. Loads from .env if not provided.
"""
self.config = config or load_config()
self._spark_host = self.config.groot_server_host
self._spark_user = self.config.spark_user
self._spark_password = self.config.spark_passwordGrootServerManager._ssh_exec method · python · L53-L83 (31 LOC)src/dm_isaac_g1/inference/server.py
def _ssh_exec(self, command: str, timeout: int = 60) -> str:
"""Execute command on Spark server via SSH.
Args:
command: Command to execute.
timeout: Timeout in seconds.
Returns:
Command output.
"""
import subprocess
ssh_cmd = [
"sshpass",
"-p",
self._spark_password,
"ssh",
"-o",
"StrictHostKeyChecking=no",
f"{self._spark_user}@{self._spark_host}",
command,
]
result = subprocess.run(
ssh_cmd,
capture_output=True,
text=True,
timeout=timeout,
)
return result.stdoutGrootServerManager.status method · python · L85-L123 (39 LOC)src/dm_isaac_g1/inference/server.py
def status(self) -> ServerStatus:
"""Get current server status.
Returns:
ServerStatus with running state and model info.
"""
try:
# Check if server process is running
output = self._ssh_exec("pgrep -f 'run_gr00t_server' || echo 'not_running'")
if "not_running" in output:
return ServerStatus(running=False)
pid = int(output.strip().split()[0]) if output.strip() else None
# Get GPU memory usage
gpu_output = self._ssh_exec(
"nvidia-smi --query-compute-apps=pid,used_memory "
"--format=csv,noheader,nounits 2>/dev/null || echo ''"
)
gpu_memory = None
if gpu_output.strip() and pid:
for line in gpu_output.strip().split("\n"):
parts = line.split(",")
if len(parts) >= 2 and str(pid) in parts[0]:
gpu_memory = int(GrootServerManager.start method · python · L125-L169 (45 LOC)src/dm_isaac_g1/inference/server.py
def start(
self,
model_path: str = "datamentorshf/groot-g1-gripper-hospitality-7ds",
port: int = 5555,
embodiment_tag: str = "UNITREE_G1",
background: bool = True,
) -> bool:
"""Start the GROOT inference server.
Args:
model_path: HuggingFace model repo or local path.
port: Port to run server on.
embodiment_tag: Embodiment tag for the model.
background: Run in background with nohup.
Returns:
True if server started successfully.
"""
# Check if already running
status = self.status()
if status.running:
print(f"Server already running (PID: {status.pid})")
return True
# Build start command
cmd = (
f"cd /workspace/Isaac-GR00T && "
f"source /opt/conda/etc/profile.d/conda.sh && "
f"conda activate grootenv && "
f"python gr00t/eval/run_gr00t_servProvenance: Repobility (https://repobility.com) — every score reproducible from /scan/
GrootServerManager.stop method · python · L171-L183 (13 LOC)src/dm_isaac_g1/inference/server.py
def stop(self) -> bool:
"""Stop the GROOT inference server.
Returns:
True if server stopped successfully.
"""
try:
self._ssh_exec("pkill -f 'run_gr00t_server' || true")
print("Server stopped")
return True
except Exception as e:
print(f"Error stopping server: {e}")
return FalseGrootServerManager.restart method · python · L185-L205 (21 LOC)src/dm_isaac_g1/inference/server.py
def restart(
self,
model_path: Optional[str] = None,
port: int = 5555,
) -> bool:
"""Restart the server, optionally with a new model.
Args:
model_path: New model to load (uses existing if None).
port: Port to run on.
Returns:
True if restart successful.
"""
self.stop()
import time
time.sleep(2) # Wait for process to fully terminate
model = model_path or "datamentorshf/groot-g1-gripper-hospitality-7ds"
return self.start(model_path=model, port=port)GrootServerManager.get_logs method · python · L207-L216 (10 LOC)src/dm_isaac_g1/inference/server.py
def get_logs(self, lines: int = 50) -> str:
"""Get recent server logs.
Args:
lines: Number of lines to retrieve.
Returns:
Log output.
"""
return self._ssh_exec(f"tail -{lines} /tmp/groot_server.log 2>/dev/null || echo 'No logs'")