← back to datamentors__dm-isaac-g1

Function bodies 101 total

All specs Real LLM only Function bodies
_to_list function · python · L114-L122 (9 LOC)
src/dm_isaac_g1/data/convert.py
def _to_list(val: Any) -> List[float]:
    """Convert various types to float list."""
    if val is None:
        return []
    if hasattr(val, "tolist"):
        val = val.tolist()
    if isinstance(val, (int, float)):
        return [float(val)]
    return [float(x) for x in val]
detect_format function · python · L130-L189 (60 LOC)
src/dm_isaac_g1/data/convert.py
def detect_format(info: Dict) -> Dict[str, Any]:
    """Detect dataset format from info.json.

    Args:
        info: Loaded info.json dictionary.

    Returns:
        Dictionary with format details (format, hand_type, state_shape, etc.).
    """
    features = info.get("features", {})
    feature_keys = list(features.keys())

    result = {
        "format": "unknown",
        "hand_type": "gripper",
        "has_body": False,
        "has_separate_parts": False,
        "has_observation_state": False,
        "state_shape": None,
        "joint_names": [],
    }

    # Check for observation.state (combined format)
    if "observation.state" in features:
        result["has_observation_state"] = True
        result["state_shape"] = features["observation.state"].get("shape", [])
        names = features["observation.state"].get("names", [])
        if names and isinstance(names[0], list):
            names = names[0]
        result["joint_names"] = names

        if result["state_sha
_process_hospitality_row function · python · L197-L234 (38 LOC)
src/dm_isaac_g1/data/convert.py
def _process_hospitality_row(row: Dict, hand_type: str) -> Tuple[List[float], List[float]]:
    """Process a row from hospitality format."""
    # Get body state (29 DOF)
    body = _to_list(row.get("observation.body", []))
    if len(body) < 29:
        body = body + [0.0] * (29 - len(body))
    elif len(body) > 29:
        body = body[:29]

    # Get grippers
    left_grip = _to_list(row.get("observation.left_gripper", [0.0]))
    right_grip = _to_list(row.get("observation.right_gripper", [0.0]))

    # Convert to Inspire
    left_inspire = gripper_to_inspire(left_grip[0] if left_grip else 0.0)
    right_inspire = gripper_to_inspire(right_grip[0] if right_grip else 0.0)

    obs_state = body + left_inspire + right_inspire

    # Actions
    left_arm_action = _to_list(row.get("action.left_arm", []))
    right_arm_action = _to_list(row.get("action.right_arm", []))
    left_grip_action = _to_list(row.get("action.left_gripper", [0.0]))
    right_grip_action = _to_list(row.get("action.rig
_process_dex3_row function · python · L237-L271 (35 LOC)
src/dm_isaac_g1/data/convert.py
def _process_dex3_row(row: Dict) -> Tuple[List[float], List[float]]:
    """Process a row from Dex3 combined format (28 DOF)."""
    obs_state_28 = _to_list(row.get("observation.state", [0.0] * 28))
    action_28 = _to_list(row.get("action", [0.0] * 28))

    obs_state_28 = obs_state_28[:28] + [0.0] * max(0, 28 - len(obs_state_28))
    action_28 = action_28[:28] + [0.0] * max(0, 28 - len(action_28))

    # Parse 28 DOF: [left_arm(7), right_arm(7), left_dex3(7), right_dex3(7)]
    left_arm = obs_state_28[0:7]
    right_arm = obs_state_28[7:14]
    left_dex3 = obs_state_28[14:21]
    right_dex3 = obs_state_28[21:28]

    # Build 53 DOF: legs(12) + waist(3) + arms(14) + hands(24)
    body_29 = [0.0] * 15 + left_arm + right_arm

    left_inspire = dex3_to_inspire(left_dex3)
    right_inspire = dex3_to_inspire(right_dex3)

    obs_state = body_29 + left_inspire + right_inspire

    # Same for actions
    left_arm_action = action_28[0:7]
    right_arm_action = action_28[7:14]
    left_dex3_a
_process_trifinger_row function · python · L274-L311 (38 LOC)
src/dm_isaac_g1/data/convert.py
def _process_trifinger_row(row: Dict) -> Tuple[List[float], List[float]]:
    """Process a row from teleop tri-finger format (43 DOF)."""
    obs_state_43 = _to_list(row.get("observation.state", [0.0] * 43))
    action_43 = _to_list(row.get("action", [0.0] * 43))

    obs_state_43 = obs_state_43[:43] + [0.0] * max(0, 43 - len(obs_state_43))
    action_43 = action_43[:43] + [0.0] * max(0, 43 - len(action_43))

    # Parse 43 DOF
    legs = obs_state_43[0:12]
    waist = obs_state_43[12:15]
    left_arm = obs_state_43[15:22]
    left_trifinger = obs_state_43[22:29]
    right_arm = obs_state_43[29:36]
    right_trifinger = obs_state_43[36:43]

    body_29 = legs + waist + left_arm + right_arm

    left_inspire = trifinger_to_inspire(left_trifinger)
    right_inspire = trifinger_to_inspire(right_trifinger)

    obs_state = body_29 + left_inspire + right_inspire

    # Same for actions
    legs_action = action_43[0:12]
    waist_action = action_43[12:15]
    left_arm_action = action_43[15:2
convert_to_inspire function · python · L319-L414 (96 LOC)
src/dm_isaac_g1/data/convert.py
def convert_to_inspire(
    input_path: Path,
    output_path: Path,
    hand_type: Optional[str] = None,
    copy_videos: bool = True,
    dry_run: bool = False,
) -> bool:
    """Convert dataset to 53 DOF Inspire format.

    Args:
        input_path: Path to input dataset.
        output_path: Path for output dataset.
        hand_type: Override detected hand type ("gripper", "dex3", "trifinger").
        copy_videos: Whether to copy video files.
        dry_run: If True, only analyze without converting.

    Returns:
        True if conversion successful.
    """
    if pq is None:
        raise ImportError("pyarrow required: pip install pyarrow")

    input_path = Path(input_path)
    output_path = Path(output_path)

    print(f"\n{'='*60}")
    print("Converting to Inspire format (53 DOF)")
    print(f"{'='*60}")
    print(f"Input:  {input_path}")
    print(f"Output: {output_path}")

    # Load info.json
    info_path = input_path / "meta" / "info.json"
    if not info_path.exist
_process_parquet_file function · python · L417-L468 (52 LOC)
src/dm_isaac_g1/data/convert.py
def _process_parquet_file(
    input_file: Path,
    output_file: Path,
    format_info: Dict,
    info: Dict,
) -> int:
    """Process a single parquet file."""
    table = pq.read_table(input_file)
    cols = table.column_names
    num_rows = table.num_rows
    data = {col: table.column(col).to_pylist() for col in cols}

    output_data = {
        "observation.state": [],
        "action": [],
    }

    # Passthrough columns
    passthrough = ["frame_index", "episode_index", "timestamp", "task", "index", "task_index"]
    for col in cols:
        if any(p in col.lower() for p in passthrough):
            output_data[col] = data[col]
        elif "observation.images" in col:
            output_data[col] = data[col]

    fmt = format_info["format"]

    for i in range(num_rows):
        row = {col: data[col][i] for col in data}

        if fmt == "hospitality":
            obs_state, action = _process_hospitality_row(row, format_info["hand_type"])
        elif fmt == "dex3_combined":
Repobility — same analyzer, your code, free for public repos · /scan/
_generate_metadata function · python · L471-L546 (76 LOC)
src/dm_isaac_g1/data/convert.py
def _generate_metadata(
    output_path: Path,
    info: Dict,
    total_frames: int,
    num_episodes: int,
    format_info: Dict,
):
    """Generate metadata files for converted dataset."""
    meta_dir = output_path / "meta"
    meta_dir.mkdir(parents=True, exist_ok=True)

    # Get video features
    video_features = {
        k: v for k, v in info.get("features", {}).items()
        if "observation.images" in k
    }

    new_info = {
        "codebase_version": "v2.1",
        "robot_type": "unitree_g1_inspire",
        "total_episodes": num_episodes,
        "total_frames": total_frames,
        "fps": info.get("fps", 30),
        "splits": {"train": f"0:{num_episodes}"},
        "data_path": "data/chunk-000/episode_{episode_index:06d}.parquet",
        "features": {
            "observation.state": {
                "dtype": "float32",
                "shape": [53],
                "description": "G1 body (29 DOF) + Inspire hands (24 DOF)",
            },
            "action": 
_copy_support_files function · python · L549-L571 (23 LOC)
src/dm_isaac_g1/data/convert.py
def _copy_support_files(input_path: Path, output_path: Path, copy_videos: bool):
    """Copy support files (episodes.jsonl, tasks.jsonl, videos)."""
    import shutil

    meta_in = input_path / "meta"
    meta_out = output_path / "meta"
    meta_out.mkdir(parents=True, exist_ok=True)

    for filename in ["episodes.jsonl", "tasks.jsonl"]:
        src = meta_in / filename
        if src.exists():
            shutil.copy2(src, meta_out / filename)
            print(f"  Copied {filename}")

    if copy_videos:
        video_dir = input_path / "videos"
        if video_dir.exists():
            output_video = output_path / "videos"
            if output_video.exists():
                shutil.rmtree(output_video)
            print(f"  Copying videos...")
            shutil.copytree(video_dir, output_video)
            print(f"  Copied videos")
_to_np function · python · L75-L82 (8 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def _to_np(val, expected_len: int) -> np.ndarray:
    """Convert value to numpy array, padding/truncating to expected length."""
    if val is None:
        return np.zeros(expected_len, dtype=np.float32)
    arr = np.asarray(val, dtype=np.float32).flatten()
    if len(arr) < expected_len:
        arr = np.pad(arr, (0, expected_len - len(arr)))
    return arr[:expected_len]
process_row function · python · L85-L126 (42 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def process_row(row: dict) -> tuple:
    """Convert a single row from hospitality format to UNITREE_G1 format.

    Returns:
        (observation_state, action) as numpy arrays.
    """
    # --- Build observation.state (31 DOF) ---
    body = _to_np(row.get("observation.body"), 29)

    left_gripper = _to_np(row.get("observation.left_gripper"), 1)
    right_gripper = _to_np(row.get("observation.right_gripper"), 1)

    # body layout: [left_leg(6), right_leg(6), waist(3), left_arm(7), right_arm(7)]
    obs_state = np.concatenate([
        body,           # 29 DOF (legs + waist + arms)
        left_gripper,   # 1 DOF
        right_gripper,  # 1 DOF
    ])  # Total: 31

    # --- Build action (23 DOF) ---
    left_arm_action = _to_np(row.get("action.left_arm"), 7)
    right_arm_action = _to_np(row.get("action.right_arm"), 7)
    left_grip_action = _to_np(row.get("action.left_gripper"), 1)
    right_grip_action = _to_np(row.get("action.right_gripper"), 1)
    body_action = _to_np(row.get(
convert_parquet_file function · python · L129-L175 (47 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def convert_parquet_file(
    input_file: Path,
    output_file: Path,
    ego_camera: str,
) -> int:
    """Convert a single parquet file."""
    table = pq.read_table(input_file)
    cols = table.column_names
    num_rows = table.num_rows

    data = {col: table.column(col).to_pylist() for col in cols}

    obs_states = []
    actions = []

    for i in range(num_rows):
        row = {col: data[col][i] for col in data}
        obs_state, action = process_row(row)
        obs_states.append(obs_state.tolist())
        actions.append(action.tolist())

    # Build output columns
    output_data = {
        "observation.state": obs_states,
        "action": actions,
    }

    # Passthrough metadata columns
    passthrough_cols = [
        "frame_index", "episode_index", "timestamp",
        "index", "task_index",
    ]
    for col in passthrough_cols:
        if col in data:
            output_data[col] = data[col]

    # Keep only the ego camera image reference, renamed to ego_view
    
generate_modality_json function · python · L178-L214 (37 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def generate_modality_json(output_path: Path, ego_camera: str):
    """Generate modality.json for UNITREE_G1 embodiment."""
    modality = {
        "state": {
            name: {
                "start": info["start"],
                "end": info["end"],
                "original_key": "observation.state",
            }
            for name, info in STATE_LAYOUT.items()
        },
        "action": {
            name: {
                "start": info["start"],
                "end": info["end"],
                "original_key": "action",
            }
            for name, info in ACTION_LAYOUT.items()
        },
        "video": {
            "ego_view": {
                "original_key": "observation.images.ego_view",
            },
        },
        "annotation": {
            "human.task_description": {
                "original_key": "task_index",
            },
        },
    }

    meta_dir = output_path / "meta"
    meta_dir.mkdir(parents=True, exist_ok=True)
    with open(meta_
generate_info_json function · python · L217-L290 (74 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def generate_info_json(
    output_path: Path,
    source_info: dict,
    total_frames: int,
    num_episodes: int,
    ego_camera: str,
):
    """Generate info.json for converted dataset."""
    ego_source_key = f"observation.images.{ego_camera}"
    ego_video_key = "observation.images.ego_view"
    source_features = source_info.get("features", {})
    video_info = source_features.get(ego_source_key, {})

    new_info = {
        "codebase_version": "v2.1",
        "robot_type": "unitree_g1",
        "total_episodes": num_episodes,
        "total_frames": total_frames,
        "total_tasks": source_info.get("total_tasks", 1),
        "total_videos": num_episodes,
        "total_chunks": source_info.get("total_chunks", 1),
        "chunks_size": source_info.get("chunks_size", 1000),
        "fps": source_info.get("fps", 30),
        "splits": source_info.get("splits", {"train": f"0:{num_episodes}"}),
        "data_path": "data/chunk-{episode_chunk:03d}/episode_{episode_index:06d}.parqu
convert_dataset function · python · L293-L419 (127 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def convert_dataset(
    input_path: Path,
    output_path: Path,
    ego_camera: str = "cam_left_high",
    dry_run: bool = False,
) -> bool:
    """Convert hospitality dataset to GR00T UNITREE_G1 format.

    Args:
        input_path: Path to input hospitality dataset.
        output_path: Path for output GR00T-formatted dataset.
        ego_camera: Camera to use as ego_view (default: cam_left_high).
        dry_run: If True, analyze only.

    Returns:
        True if successful.
    """
    if pq is None:
        raise ImportError("pyarrow required: pip install pyarrow")

    input_path = Path(input_path)
    output_path = Path(output_path)

    print(f"\n{'='*60}")
    print("Converting to GR00T UNITREE_G1 format")
    print(f"{'='*60}")
    print(f"Input:      {input_path}")
    print(f"Output:     {output_path}")
    print(f"Ego camera: {ego_camera}")

    # Load source info
    info_path = input_path / "meta" / "info.json"
    if not info_path.exists():
        print(f"ERROR: {
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
main function · python · L422-L442 (21 LOC)
src/dm_isaac_g1/data/convert_to_groot.py
def main():
    import argparse

    parser = argparse.ArgumentParser(
        description="Convert hospitality dataset to GR00T UNITREE_G1 format"
    )
    parser.add_argument("--input", required=True, help="Input dataset path")
    parser.add_argument("--output", required=True, help="Output dataset path")
    parser.add_argument(
        "--ego-camera", default="cam_left_high",
        help="Camera to use as ego_view (default: cam_left_high)"
    )
    parser.add_argument("--dry-run", action="store_true", help="Analyze only")
    args = parser.parse_args()

    success = convert_dataset(
        Path(args.input), Path(args.output),
        ego_camera=args.ego_camera,
        dry_run=args.dry_run,
    )
    return 0 if success else 1
download_dataset function · python · L25-L100 (76 LOC)
src/dm_isaac_g1/data/download.py
def download_dataset(
    repo_id: str,
    output_dir: Path,
    use_lfs: bool = True,
    hf_token: Optional[str] = None,
) -> Path:
    """Download a dataset from HuggingFace.

    Args:
        repo_id: HuggingFace repository ID (e.g., "unitreerobotics/G1_Fold_Towel").
        output_dir: Directory to download to.
        use_lfs: Use git LFS for large files (recommended for video datasets).
        hf_token: HuggingFace token for private repos.

    Returns:
        Path to downloaded dataset.

    Raises:
        RuntimeError: If download fails.
    """
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    # Extract dataset name from repo_id
    dataset_name = repo_id.split("/")[-1]
    dataset_path = output_dir / dataset_name

    if dataset_path.exists():
        print(f"Dataset already exists: {dataset_path}")
        return dataset_path

    if use_lfs:
        # Use git clone with LFS (better for large files)
        print(f"Cloning {repo_i
download_hospitality_datasets function · python · L103-L135 (33 LOC)
src/dm_isaac_g1/data/download.py
def download_hospitality_datasets(
    output_dir: Path,
    datasets: Optional[List[str]] = None,
    use_lfs: bool = True,
    hf_token: Optional[str] = None,
) -> List[Path]:
    """Download all hospitality datasets.

    Args:
        output_dir: Directory to download to.
        datasets: Specific datasets to download. Downloads all if None.
        use_lfs: Use git LFS.
        hf_token: HuggingFace token.

    Returns:
        List of paths to downloaded datasets.
    """
    datasets = datasets or HOSPITALITY_DATASETS
    downloaded = []

    for repo_id in datasets:
        try:
            path = download_dataset(
                repo_id=repo_id,
                output_dir=output_dir,
                use_lfs=use_lfs,
                hf_token=hf_token,
            )
            downloaded.append(path)
        except Exception as e:
            print(f"Failed to download {repo_id}: {e}")

    return downloaded
download_dex3_datasets function · python · L138-L167 (30 LOC)
src/dm_isaac_g1/data/download.py
def download_dex3_datasets(
    output_dir: Path,
    use_lfs: bool = True,
    hf_token: Optional[str] = None,
) -> List[Path]:
    """Download Dex3 datasets.

    Args:
        output_dir: Directory to download to.
        use_lfs: Use git LFS.
        hf_token: HuggingFace token.

    Returns:
        List of paths to downloaded datasets.
    """
    downloaded = []

    for repo_id in DEX3_DATASETS:
        try:
            path = download_dataset(
                repo_id=repo_id,
                output_dir=output_dir,
                use_lfs=use_lfs,
                hf_token=hf_token,
            )
            downloaded.append(path)
        except Exception as e:
            print(f"Failed to download {repo_id}: {e}")

    return downloaded
compute_stats function · python · L15-L108 (94 LOC)
src/dm_isaac_g1/data/stats.py
def compute_stats(
    dataset_path: Path,
    keys: Optional[List[str]] = None,
    output_file: Optional[Path] = None,
) -> Dict[str, Dict[str, List[float]]]:
    """Compute normalization statistics for dataset.

    Computes mean, std, min, max for observation.state and action columns.

    Args:
        dataset_path: Path to dataset.
        keys: Specific columns to compute stats for. Defaults to
              ["observation.state", "action"].
        output_file: If provided, saves stats to this file.

    Returns:
        Dictionary mapping column names to their statistics.
    """
    if pq is None:
        raise ImportError("pyarrow required: pip install pyarrow")

    dataset_path = Path(dataset_path)
    keys = keys or ["observation.state", "action"]

    # Find all parquet files
    parquet_files = sorted((dataset_path / "data").rglob("*.parquet"))

    if not parquet_files:
        raise ValueError(f"No parquet files found in {dataset_path / 'data'}")

    print(f"Computing
load_stats function · python · L111-L126 (16 LOC)
src/dm_isaac_g1/data/stats.py
def load_stats(dataset_path: Path) -> Dict[str, Dict[str, List[float]]]:
    """Load statistics from stats.json.

    Args:
        dataset_path: Path to dataset.

    Returns:
        Statistics dictionary.
    """
    stats_file = Path(dataset_path) / "meta" / "stats.json"

    if not stats_file.exists():
        raise FileNotFoundError(f"Stats file not found: {stats_file}")

    with open(stats_file) as f:
        return json.load(f)
normalize function · python · L129-L155 (27 LOC)
src/dm_isaac_g1/data/stats.py
def normalize(
    data: np.ndarray,
    stats: Dict[str, List[float]],
    method: str = "standard",
) -> np.ndarray:
    """Normalize data using computed statistics.

    Args:
        data: Data array to normalize.
        stats: Statistics dictionary with mean, std, min, max.
        method: Normalization method ("standard" or "minmax").

    Returns:
        Normalized data array.
    """
    if method == "standard":
        mean = np.array(stats["mean"])
        std = np.array(stats["std"])
        return (data - mean) / std

    elif method == "minmax":
        min_val = np.array(stats["min"])
        max_val = np.array(stats["max"])
        return (data - min_val) / (max_val - min_val + 1e-6)

    else:
        raise ValueError(f"Unknown normalization method: {method}")
denormalize function · python · L158-L184 (27 LOC)
src/dm_isaac_g1/data/stats.py
def denormalize(
    data: np.ndarray,
    stats: Dict[str, List[float]],
    method: str = "standard",
) -> np.ndarray:
    """Denormalize data using computed statistics.

    Args:
        data: Normalized data array.
        stats: Statistics dictionary.
        method: Normalization method used.

    Returns:
        Denormalized data array.
    """
    if method == "standard":
        mean = np.array(stats["mean"])
        std = np.array(stats["std"])
        return data * std + mean

    elif method == "minmax":
        min_val = np.array(stats["min"])
        max_val = np.array(stats["max"])
        return data * (max_val - min_val) + min_val

    else:
        raise ValueError(f"Unknown normalization method: {method}")
Repobility's GitHub App fixes findings like these · https://github.com/apps/repobility-bot
validate_dataset function · python · L24-L123 (100 LOC)
src/dm_isaac_g1/data/validate.py
def validate_dataset(dataset_path: Path) -> ValidationResult:
    """Validate dataset structure and contents.

    Checks for:
    - Required meta files (info.json, modality.json, episodes.jsonl, tasks.jsonl)
    - Parquet file existence and schema
    - Video file existence (if referenced)
    - Consistency between meta files and data

    Args:
        dataset_path: Path to dataset directory.

    Returns:
        ValidationResult with validation status and any issues found.
    """
    dataset_path = Path(dataset_path)
    errors = []
    warnings = []
    info = {}

    # Check directory exists
    if not dataset_path.exists():
        return ValidationResult(
            valid=False,
            errors=[f"Dataset path does not exist: {dataset_path}"],
        )

    meta_dir = dataset_path / "meta"
    data_dir = dataset_path / "data"

    # Check meta directory
    if not meta_dir.exists():
        errors.append("Missing meta/ directory")
    else:
        # Check required meta f
fix_episodes function · python · L126-L219 (94 LOC)
src/dm_isaac_g1/data/validate.py
def fix_episodes(
    dataset_path: Path,
    task_description: Optional[str] = None,
) -> bool:
    """Create or fix episodes.jsonl and tasks.jsonl files.

    Args:
        dataset_path: Path to dataset.
        task_description: Task description to use. Auto-detected if None.

    Returns:
        True if files were created/fixed successfully.
    """
    if pq is None:
        raise ImportError("pyarrow required: pip install pyarrow")

    dataset_path = Path(dataset_path)
    meta_dir = dataset_path / "meta"
    meta_dir.mkdir(parents=True, exist_ok=True)

    episodes_file = meta_dir / "episodes.jsonl"
    tasks_file = meta_dir / "tasks.jsonl"

    # Load info.json
    info_file = meta_dir / "info.json"
    if not info_file.exists():
        print(f"ERROR: info.json not found in {meta_dir}")
        return False

    with open(info_file) as f:
        info = json.load(f)

    total_episodes = info.get("total_episodes", 0)

    # Count parquet files if total_episodes not set
    i
validate_config function · python · L197-L216 (20 LOC)
src/dm_isaac_g1/finetuning/configs/g1_inspire_53dof.py
def validate_config():
    """Validate the configuration dimensions."""
    total_dof = 0
    for key, (start, end) in JOINT_INDEX_RANGES.items():
        dof = end - start
        total_dof += dof
        joint_count = len(G1_INSPIRE_JOINT_NAMES[key])
        assert dof == joint_count, f"{key}: index range {dof} != joint count {joint_count}"

    assert total_dof == 53, f"Total DOF should be 53, got {total_dof}"
    print(f"Configuration validated: {total_dof} DOF")

    # Print summary
    print("\nJoint Configuration Summary:")
    print("-" * 40)
    for key, joints in G1_INSPIRE_JOINT_NAMES.items():
        start, end = JOINT_INDEX_RANGES[key]
        print(f"  {key}: {len(joints)} DOF (indices {start}-{end-1})")
    print("-" * 40)
    print(f"  TOTAL: {total_dof} DOF")
build_finetune_command function · python · L110-L167 (58 LOC)
src/dm_isaac_g1/finetuning/launcher.py
def build_finetune_command(args: FinetuneArgs) -> list[str]:
    """Build the torchrun command to launch fine-tuning.

    Args:
        args: Fine-tuning arguments

    Returns:
        Command as list of strings
    """
    if not args.datasets:
        raise ValueError("At least one dataset path must be provided in args.datasets")

    script = "/workspace/Isaac-GR00T/gr00t/experiment/launch_finetune.py"

    # Use torchrun for distributed training (matching official approach)
    cmd = [
        "torchrun",
        f"--nproc_per_node={args.num_gpus}",
        "--master_port=29500",
        script,
        "--base_model_path", args.base_model,
        "--dataset_path", args.datasets[0],
        "--embodiment_tag", args.embodiment_tag,
        "--output_dir", args.output,
        "--max_steps", str(args.max_steps),
        "--save_steps", str(args.save_steps),
        "--global_batch_size", str(args.batch_size),
        "--learning_rate", str(args.learning_rate),
        "--weight_de
launch_finetune function · python · L170-L200 (31 LOC)
src/dm_isaac_g1/finetuning/launcher.py
def launch_finetune(args: FinetuneArgs, dry_run: bool = False) -> int:
    """Launch fine-tuning job.

    Args:
        args: Fine-tuning arguments
        dry_run: If True, print command but don't execute

    Returns:
        Return code from subprocess (0 = success)
    """
    cmd = build_finetune_command(args)

    print("=" * 60)
    print("GROOT Fine-tuning Command:")
    print(" ".join(cmd))
    print("=" * 60)

    if dry_run:
        print("(dry run - not executing)")
        return 0

    # Check if Isaac-GR00T is available
    groot_path = Path("/workspace/Isaac-GR00T")
    if not groot_path.exists():
        print(f"ERROR: Isaac-GR00T not found at {groot_path}")
        print("This script must be run on the workstation with Isaac-GR00T installed.")
        return 1

    # Launch training
    result = subprocess.run(cmd, cwd=str(groot_path))
    return result.returncode
main function · python · L203-L271 (69 LOC)
src/dm_isaac_g1/finetuning/launcher.py
def main():
    """CLI entry point."""
    import argparse

    parser = argparse.ArgumentParser(
        description="Launch GROOT fine-tuning via torchrun",
        formatter_class=argparse.ArgumentDefaultsHelpFormatter
    )

    # Required
    parser.add_argument("--base-model", default="nvidia/GR00T-N1.6-3B",
                       help="Base model path or HuggingFace ID")
    parser.add_argument("--datasets", nargs="+", required=True,
                       help="Dataset path (first dataset used)")
    parser.add_argument("--config", default="",
                       help="Modality config file (empty for pre-registered embodiments)")
    parser.add_argument("--output", default="./checkpoints/groot_finetuned",
                       help="Output directory")

    # Training
    parser.add_argument("--max-steps", type=int, default=10000)
    parser.add_argument("--save-steps", type=int, default=2000)
    parser.add_argument("--save-total-limit", type=int, default=2)
    parser.add_
GrootClient.__init__ method · python · L110-L133 (24 LOC)
src/dm_isaac_g1/inference/client.py
    def __init__(
        self,
        host: Optional[str] = None,
        port: Optional[int] = None,
        timeout: float = 30.0,
        config: Optional[Config] = None,
    ):
        """Initialize the GROOT client.

        Args:
            host: GROOT server host. Defaults to config or 192.168.1.237.
            port: GROOT server port. Defaults to config or 5555.
            timeout: Request timeout in seconds.
            config: Optional Config instance for settings.
        """
        if config is None:
            from dm_isaac_g1.core.config import load_config
            config = load_config()

        self.host = host or config.groot_server_host
        self.port = port or config.groot_server_port
        self.timeout_ms = int(timeout * 1000)
        self._context = zmq.Context()
        self._socket: Optional[zmq.Socket] = None
GrootClient._connect method · python · L135-L142 (8 LOC)
src/dm_isaac_g1/inference/client.py
    def _connect(self):
        """Establish ZeroMQ connection."""
        if self._socket is None:
            self._socket = self._context.socket(zmq.REQ)
            self._socket.setsockopt(zmq.RCVTIMEO, self.timeout_ms)
            self._socket.setsockopt(zmq.SNDTIMEO, self.timeout_ms)
            self._socket.setsockopt(zmq.LINGER, 0)
            self._socket.connect(f"tcp://{self.host}:{self.port}")
Repobility · code-quality intelligence platform · https://repobility.com
GrootClient.health_check method · python · L144-L165 (22 LOC)
src/dm_isaac_g1/inference/client.py
    def health_check(self) -> bool:
        """Check if the GROOT server is healthy.

        Returns:
            True if server is responsive, False otherwise.
        """
        try:
            # Use a separate socket with short timeout for health check
            test_socket = self._context.socket(zmq.REQ)
            test_socket.setsockopt(zmq.RCVTIMEO, 3000)  # 3 second timeout
            test_socket.setsockopt(zmq.SNDTIMEO, 3000)
            test_socket.setsockopt(zmq.LINGER, 0)
            test_socket.connect(f"tcp://{self.host}:{self.port}")

            # Send ping request
            request = {"endpoint": "ping"}
            test_socket.send(MsgSerializer.to_bytes(request))
            test_socket.recv()  # Wait for any response
            test_socket.close()
            return True
        except zmq.ZMQError:
            return False
GrootClient.get_action method · python · L167-L310 (144 LOC)
src/dm_isaac_g1/inference/client.py
    def get_action(
        self,
        observation: np.ndarray,
        images: Optional[dict[str, np.ndarray]] = None,
        image: Optional[np.ndarray] = None,
        task: Optional[str] = None,
        action_horizon: int = 16,
        execute_steps: int = 1,
        return_full_trajectory: bool = False,
    ) -> Union[np.ndarray, dict]:
        """Get action from the GROOT model.

        GR00T 1.6 predicts action chunks (typically 8-16 steps). This method
        allows configuring the action horizon at runtime for optimal performance.

        Args:
            observation: Robot state observation (28 DOF for G1+Dex3).
            images: Dict mapping camera names to RGB images (H, W, 3).
                    Expected keys: cam_left_high, cam_right_high,
                    cam_left_wrist, cam_right_wrist.
            image: Deprecated. Single RGB image sent as cam_left_high only.
                   Use `images` dict instead for multi-camera models.
            task: Optiona
GrootClient.get_policy_info method · python · L312-L321 (10 LOC)
src/dm_isaac_g1/inference/client.py
    def get_policy_info(self) -> dict:
        """Get information about the loaded policy.

        Returns:
            Dictionary with model info (name, embodiment, DOF, etc.).
        """
        self._connect()
        request = {"endpoint": "get_policy_info"}
        self._socket.send(MsgSerializer.to_bytes(request))
        return MsgSerializer.from_bytes(self._socket.recv())
GrootClient.close method · python · L323-L328 (6 LOC)
src/dm_isaac_g1/inference/client.py
    def close(self):
        """Close the ZeroMQ client."""
        if self._socket:
            self._socket.close()
            self._socket = None
        self._context.term()
GrootClientAsync.__init__ method · python · L351-L379 (29 LOC)
src/dm_isaac_g1/inference/client.py
    def __init__(
        self,
        host: Optional[str] = None,
        port: Optional[int] = None,
        timeout: float = 30.0,
        config: Optional[Config] = None,
    ):
        """Initialize async GROOT client.

        Args:
            host: GROOT server host.
            port: GROOT server port.
            timeout: Request timeout in seconds.
            config: Optional Config instance.
        """
        if config is None:
            from dm_isaac_g1.core.config import load_config
            config = load_config()

        self.host = host or config.groot_server_host
        self.port = port or config.groot_server_port
        self.timeout_ms = int(timeout * 1000)
        # Use sync client internally since zmq async requires zmq.asyncio
        self._sync_client = GrootClient(
            host=self.host,
            port=self.port,
            timeout=timeout,
            config=config,
        )
GrootClientAsync.get_action method · python · L387-L421 (35 LOC)
src/dm_isaac_g1/inference/client.py
    async def get_action(
        self,
        observation: np.ndarray,
        images: Optional[dict[str, np.ndarray]] = None,
        image: Optional[np.ndarray] = None,
        task: Optional[str] = None,
        action_horizon: int = 16,
        execute_steps: int = 1,
    ) -> np.ndarray:
        """Get action from the GROOT model asynchronously.

        Args:
            observation: Robot state observation (28 DOF for Dex3).
            images: Dict mapping camera names to RGB images.
            image: Deprecated. Single RGB image. Use `images` instead.
            task: Optional task description.
            action_horizon: Number of steps to predict (8-16 recommended).
            execute_steps: Number of steps to return for execution.

        Returns:
            Action array. Shape (28,) if execute_steps=1, else (execute_steps, 28).
        """
        import asyncio
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            None,
    
IsaacSimRunner.__init__ method · python · L90-L100 (11 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def __init__(self, config: Optional[Config] = None):
        """Initialize Isaac Sim runner.

        Args:
            config: Configuration instance.
        """
        self.config = config or load_config()
        self.workstation = WorkstationConnection(config=self.config)
        self.groot_client: Optional[GrootClient] = None
        self.current_env: Optional[IsaacEnv] = None
        self._session_id: Optional[str] = None
IsaacSimRunner.setup method · python · L102-L133 (32 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def setup(
        self,
        env: IsaacEnv = IsaacEnv.PICK_REDBLOCK,
        groot_host: Optional[str] = None,
        groot_port: Optional[int] = None,
    ) -> bool:
        """Setup the environment and GROOT client.

        Args:
            env: Isaac environment to use.
            groot_host: GROOT server host.
            groot_port: GROOT server port.

        Returns:
            True if setup successful.
        """
        self.current_env = env

        # Connect to GROOT server
        self.groot_client = GrootClient(
            host=groot_host,
            port=groot_port,
            config=self.config,
        )

        if not self.groot_client.health_check():
            print(f"Warning: GROOT server not responding at {self.groot_client.base_url}")
            return False

        print(f"Connected to GROOT server at {self.groot_client.base_url}")
        print(f"Environment: {env.value}")
        return True
Repobility — same analyzer, your code, free for public repos · /scan/
IsaacSimRunner.run_episode method · python · L135-L197 (63 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def run_episode(
        self,
        task: str = "Complete the manipulation task",
        max_steps: int = 500,
        action_horizon: int = 16,
        execute_steps: int = 1,
        render: bool = False,
        record: bool = False,
    ) -> EpisodeResult:
        """Run a single episode using GROOT for control.

        This executes the simulation on the workstation and uses
        the GROOT server for action inference.

        Args:
            task: Task description for language conditioning.
            max_steps: Maximum steps per episode.
            action_horizon: Number of steps GROOT predicts (8-16 recommended).
            execute_steps: Steps to execute before re-planning (1=receding horizon).
            render: Whether to render the simulation.
            record: Whether to record video.

        Returns:
            EpisodeResult with success status and metrics.
        """
        if self.groot_client is None or self.current_env is None:
            rais
IsaacSimRunner._parse_episode_result method · python · L413-L434 (22 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def _parse_episode_result(self, stdout: str) -> EpisodeResult:
        """Parse episode result from script output.

        Args:
            stdout: Script output.

        Returns:
            EpisodeResult parsed from output.
        """
        import json

        for line in stdout.split("\n"):
            if line.startswith("RESULT:"):
                data = json.loads(line[7:])
                return EpisodeResult(
                    success=data.get("success", False),
                    total_reward=data.get("total_reward", 0.0),
                    steps=data.get("steps", 0),
                    metrics=data,
                )

        return EpisodeResult(success=False, total_reward=0.0, steps=0)
IsaacSimRunner.run_benchmark method · python · L436-L477 (42 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def run_benchmark(
        self,
        env: IsaacEnv,
        num_episodes: int = 10,
        task: str = "Complete the manipulation task",
    ) -> Dict[str, Any]:
        """Run multiple episodes and compute statistics.

        Args:
            env: Environment to benchmark.
            num_episodes: Number of episodes to run.
            task: Task description.

        Returns:
            Dictionary with benchmark results.
        """
        self.setup(env)

        results = []
        for i in range(num_episodes):
            print(f"Episode {i+1}/{num_episodes}...")
            result = self.run_episode(task=task)
            results.append(result)
            print(f"  Success: {result.success}, Reward: {result.total_reward:.2f}")

        # Compute statistics
        successes = [r.success for r in results]
        rewards = [r.total_reward for r in results]
        steps = [r.steps for r in results]

        return {
            "environment": env.value,
           
IsaacSimRunner.list_available_envs method · python · L479-L485 (7 LOC)
src/dm_isaac_g1/inference/isaac_runner.py
    def list_available_envs(self) -> List[str]:
        """List available Isaac Sim environments.

        Returns:
            List of environment names.
        """
        return [env.value for env in IsaacEnv]
GrootServerManager.__init__ method · python · L42-L51 (10 LOC)
src/dm_isaac_g1/inference/server.py
    def __init__(self, config: Optional[Config] = None):
        """Initialize server manager.

        Args:
            config: Configuration instance. Loads from .env if not provided.
        """
        self.config = config or load_config()
        self._spark_host = self.config.groot_server_host
        self._spark_user = self.config.spark_user
        self._spark_password = self.config.spark_password
GrootServerManager._ssh_exec method · python · L53-L83 (31 LOC)
src/dm_isaac_g1/inference/server.py
    def _ssh_exec(self, command: str, timeout: int = 60) -> str:
        """Execute command on Spark server via SSH.

        Args:
            command: Command to execute.
            timeout: Timeout in seconds.

        Returns:
            Command output.
        """
        import subprocess

        ssh_cmd = [
            "sshpass",
            "-p",
            self._spark_password,
            "ssh",
            "-o",
            "StrictHostKeyChecking=no",
            f"{self._spark_user}@{self._spark_host}",
            command,
        ]

        result = subprocess.run(
            ssh_cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
        )

        return result.stdout
GrootServerManager.status method · python · L85-L123 (39 LOC)
src/dm_isaac_g1/inference/server.py
    def status(self) -> ServerStatus:
        """Get current server status.

        Returns:
            ServerStatus with running state and model info.
        """
        try:
            # Check if server process is running
            output = self._ssh_exec("pgrep -f 'run_gr00t_server' || echo 'not_running'")

            if "not_running" in output:
                return ServerStatus(running=False)

            pid = int(output.strip().split()[0]) if output.strip() else None

            # Get GPU memory usage
            gpu_output = self._ssh_exec(
                "nvidia-smi --query-compute-apps=pid,used_memory "
                "--format=csv,noheader,nounits 2>/dev/null || echo ''"
            )

            gpu_memory = None
            if gpu_output.strip() and pid:
                for line in gpu_output.strip().split("\n"):
                    parts = line.split(",")
                    if len(parts) >= 2 and str(pid) in parts[0]:
                        gpu_memory = int(
GrootServerManager.start method · python · L125-L169 (45 LOC)
src/dm_isaac_g1/inference/server.py
    def start(
        self,
        model_path: str = "datamentorshf/groot-g1-gripper-hospitality-7ds",
        port: int = 5555,
        embodiment_tag: str = "UNITREE_G1",
        background: bool = True,
    ) -> bool:
        """Start the GROOT inference server.

        Args:
            model_path: HuggingFace model repo or local path.
            port: Port to run server on.
            embodiment_tag: Embodiment tag for the model.
            background: Run in background with nohup.

        Returns:
            True if server started successfully.
        """
        # Check if already running
        status = self.status()
        if status.running:
            print(f"Server already running (PID: {status.pid})")
            return True

        # Build start command
        cmd = (
            f"cd /workspace/Isaac-GR00T && "
            f"source /opt/conda/etc/profile.d/conda.sh && "
            f"conda activate grootenv && "
            f"python gr00t/eval/run_gr00t_serv
Provenance: Repobility (https://repobility.com) — every score reproducible from /scan/
GrootServerManager.stop method · python · L171-L183 (13 LOC)
src/dm_isaac_g1/inference/server.py
    def stop(self) -> bool:
        """Stop the GROOT inference server.

        Returns:
            True if server stopped successfully.
        """
        try:
            self._ssh_exec("pkill -f 'run_gr00t_server' || true")
            print("Server stopped")
            return True
        except Exception as e:
            print(f"Error stopping server: {e}")
            return False
GrootServerManager.restart method · python · L185-L205 (21 LOC)
src/dm_isaac_g1/inference/server.py
    def restart(
        self,
        model_path: Optional[str] = None,
        port: int = 5555,
    ) -> bool:
        """Restart the server, optionally with a new model.

        Args:
            model_path: New model to load (uses existing if None).
            port: Port to run on.

        Returns:
            True if restart successful.
        """
        self.stop()

        import time
        time.sleep(2)  # Wait for process to fully terminate

        model = model_path or "datamentorshf/groot-g1-gripper-hospitality-7ds"
        return self.start(model_path=model, port=port)
GrootServerManager.get_logs method · python · L207-L216 (10 LOC)
src/dm_isaac_g1/inference/server.py
    def get_logs(self, lines: int = 50) -> str:
        """Get recent server logs.

        Args:
            lines: Number of lines to retrieve.

        Returns:
            Log output.
        """
        return self._ssh_exec(f"tail -{lines} /tmp/groot_server.log 2>/dev/null || echo 'No logs'")
‹ prevpage 2 / 3next ›