← back to datamentors__dm-isaac-g1

Function bodies 101 total

All specs Real LLM only Function bodies
InferenceSetup.get_cameras method · python · L184-L196 (13 LOC)
scripts/inference_setups.py
    def get_cameras(self) -> list:
        """Return camera list, building from legacy fields if needed."""
        if self.cameras:
            return self.cameras
        # Build single-camera list from legacy fields
        return [CameraSpec(
            name="cam_left_high",
            camera_parent=self.camera_parent,
            pos=self.camera_pos,
            rot=self.camera_rot,
            focal_length=self.focal_length,
            horizontal_aperture=self.horizontal_aperture,
        )]
InferenceSetup.get_primary_camera method · python · L198-L211 (14 LOC)
scripts/inference_setups.py
    def get_primary_camera(self) -> CameraSpec:
        """Return the primary (first non-duplicate) camera."""
        for cam in self.get_cameras():
            if not cam.is_duplicate:
                return cam
        # Fallback: legacy fields
        return CameraSpec(
            name="cam_left_high",
            camera_parent=self.camera_parent,
            pos=self.camera_pos,
            rot=self.camera_rot,
            focal_length=self.focal_length,
            horizontal_aperture=self.horizontal_aperture,
        )
InferenceSetup.get_extra_cameras method · python · L213-L219 (7 LOC)
scripts/inference_setups.py
    def get_extra_cameras(self) -> list:
        """Return extra cameras (everything except the primary, excluding duplicates that reference primary)."""
        all_cams = self.get_cameras()
        if len(all_cams) <= 1:
            return []
        # Everything after the first camera, but only physical cameras (not duplicates)
        return [c for c in all_cams[1:] if not c.is_duplicate]
get_setup function · python · L623-L628 (6 LOC)
scripts/inference_setups.py
def get_setup(name: str) -> InferenceSetup:
    """Return an InferenceSetup by name, raising ValueError if not found."""
    if name not in SETUPS:
        available = ", ".join(sorted(SETUPS.keys()))
        raise ValueError(f"Unknown setup '{name}'. Available: {available}")
    return SETUPS[name]
list_setups function · python · L631-L651 (21 LOC)
scripts/inference_setups.py
def list_setups() -> str:
    """Return a formatted string listing all available setups."""
    lines = ["Available inference setups:", ""]
    for name, setup in SETUPS.items():
        lines.append(f"  {name}")
        lines.append(f"    {setup.description}")
        cams = setup.get_cameras()
        if len(cams) > 1:
            cam_names = [c.name for c in cams]
            lines.append(f"    cameras: {cam_names}")
        else:
            primary = setup.get_primary_camera()
            lines.append(f"    camera_parent={primary.camera_parent}, pos={primary.pos}, rot={primary.rot}")
        if setup.scene:
            lines.append(f"    scene: {setup.scene}")
        if setup.dof_layout:
            total_dof = max(end for _, end in setup.dof_layout.values())
            lines.append(f"    dof: {total_dof} ({', '.join(f'{k}:{e-s}' for k, (s, e) in setup.dof_layout.items())})")
        lines.append(f"    object_pos={setup.object_pos}, plate_pos={setup.plate_pos}")
        lines.ap
load_env_cfg_class function · python · L63-L68 (6 LOC)
scripts/launch_scene_ui.py
def load_env_cfg_class(scene_name: str):
    """Dynamically load environment config class."""
    module_path = AVAILABLE_SCENES[scene_name]
    module_name, class_name = module_path.rsplit(".", 1)
    module = import_module(module_name)
    return getattr(module, class_name)
main function · python · L71-L142 (72 LOC)
scripts/launch_scene_ui.py
def main():
    print(f"[INFO] Loading scene: {args_cli.scene}", flush=True)

    EnvCfgClass = load_env_cfg_class(args_cli.scene)
    env_cfg = EnvCfgClass()
    env_cfg.scene.num_envs = args_cli.num_envs

    # Disable curriculum if present
    if hasattr(env_cfg, 'curriculum'):
        env_cfg.curriculum = None

    # Disable any camera sensors to avoid synthetic data bugs
    # The scene's built-in cameras will still be visible in the UI viewport
    if hasattr(env_cfg.scene, 'tiled_camera'):
        env_cfg.scene.tiled_camera = None
    if hasattr(env_cfg.scene, 'front_camera'):
        env_cfg.scene.front_camera = None
    if hasattr(env_cfg.scene, 'wrist_camera_left'):
        env_cfg.scene.wrist_camera_left = None
    if hasattr(env_cfg.scene, 'wrist_camera_right'):
        env_cfg.scene.wrist_camera_right = None

    # Configure sim device
    env_cfg.sim.device = args_cli.device if hasattr(args_cli, 'device') and args_cli.device else "cuda:0"

    print("[INFO] Creating envir
Repobility · code-quality intelligence platform · https://repobility.com
load_env_cfg_class function · python · L221-L233 (13 LOC)
scripts/policy_inference_groot_g1.py
def load_env_cfg_class(scene_name: str):
    """Dynamically load environment config class based on scene name."""
    if scene_name not in AVAILABLE_SCENES:
        raise ValueError(f"Unknown scene: {scene_name}. Available: {list(AVAILABLE_SCENES.keys())}")

    module_path = AVAILABLE_SCENES[scene_name]
    module_name, class_name = module_path.rsplit(".", 1)

    try:
        module = import_module(module_name)
        return getattr(module, class_name)
    except (ImportError, AttributeError) as e:
        raise RuntimeError(f"Failed to load scene '{scene_name}': {e}")
build_flat_observation function · python · L287-L428 (142 LOC)
scripts/policy_inference_groot_g1.py
def build_flat_observation(
    camera_rgb: np.ndarray,
    joint_pos: np.ndarray,
    group_joint_ids: dict,
    state_dims: dict,
    language_cmd: str,
    video_horizon: int = 1,
    state_horizon: int = 1,
    action_joint_ids: list = None,
    joint_to_53dof_mapping: dict = None,
    extra_camera_rgbs: dict = None,
    primary_camera_name: str = "cam_left_high",
    negate_state_mask: np.ndarray = None,
    embodiment_format: str = "new_embodiment",
) -> dict:
    """
    Build observation dictionary for GROOT inference server.

    Supports two server-side formats:

    1. **new_embodiment** (28/53 DOF, Dex3/Inspire):
       Nested dicts: {"video": {"cam_left_high": ...}, "state": {"observation.state": ...}}

    2. **unitree_g1** (31 DOF state, gripper):
       Flat dot-separated keys for Gr00tSimPolicyWrapper:
       {"video.ego_view": ..., "state.left_arm": ..., "annotation.human.task_description": ...}

    Args:
        camera_rgb: Primary camera image (B, H, W, C) uint8
  
merge_datasets function · python · L24-L187 (164 LOC)
scripts/training/merge_datasets.py
def merge_datasets(input_dir: Path, output_dir: Path, dataset_names: list[str]):
    """Merge multiple GR00T datasets into one."""
    if output_dir.exists():
        shutil.rmtree(output_dir)

    output_dir.mkdir(parents=True)
    (output_dir / "meta").mkdir()
    (output_dir / "data").mkdir()

    global_ep = 0
    total_frames = 0
    all_tasks = {}
    all_episodes = []
    all_episodes_stats = []

    for ds_name in dataset_names:
        ds_path = input_dir / ds_name
        info_path = ds_path / "meta" / "info.json"
        with open(info_path) as f:
            info = json.load(f)

        n_eps = info["total_episodes"]
        print(f"\n=== {ds_name}: {n_eps} episodes ===")

        # Read task descriptions
        tasks_path = ds_path / "meta" / "tasks.jsonl"
        ds_tasks = {}
        if tasks_path.exists():
            with open(tasks_path) as f:
                for line in f:
                    t = json.loads(line)
                    ds_tasks[t["task_index"]] = t["ta
main function · python · L190-L218 (29 LOC)
scripts/training/merge_datasets.py
def main():
    parser = argparse.ArgumentParser(description="Merge GR00T datasets")
    parser.add_argument(
        "--input-dir",
        default="/workspace/datasets/groot",
        help="Directory containing individual converted datasets",
    )
    parser.add_argument(
        "--output",
        default="/workspace/datasets/groot_merged",
        help="Output merged dataset path",
    )
    parser.add_argument(
        "--datasets",
        nargs="+",
        default=[
            "G1_Fold_Towel",
            "G1_Clean_Table",
            "G1_Wipe_Table",
            "G1_Prepare_Fruit",
            "G1_Pour_Medicine",
            "G1_Organize_Tools",
            "G1_Pack_PingPong",
        ],
        help="Dataset names to merge",
    )
    args = parser.parse_args()

    merge_datasets(Path(args.input_dir), Path(args.output), args.datasets)
main function · python · L11-L17 (7 LOC)
src/dm_isaac_g1/cli.py
def main():
    """DM-ISAAC-G1: G1 Robot Training Suite.

    Fine-tuning, Inference, Imitation Learning, and Reinforcement Learning
    for the Unitree G1 EDU 2 robot with UNITREE_G1 Gripper Hands.
    """
    pass
download function · python · L35-L50 (16 LOC)
src/dm_isaac_g1/cli.py
def download(repo_id: str, output: str, lfs: bool):
    """Download a dataset from HuggingFace.

    Example: dm-g1 data download unitreerobotics/G1_Fold_Towel
    """
    from dm_isaac_g1.data.download import download_dataset

    output_path = Path(output)
    click.echo(f"Downloading {repo_id} to {output_path}...")

    try:
        path = download_dataset(repo_id, output_path, use_lfs=lfs)
        click.echo(f"Downloaded to: {path}")
    except Exception as e:
        click.echo(f"Error: {e}", err=True)
        raise SystemExit(1)
convert function · python · L58-L73 (16 LOC)
src/dm_isaac_g1/cli.py
def convert(input_path: str, output_path: str, hand_type: Optional[str], dry_run: bool):
    """Convert dataset to 53 DOF Inspire format.

    Example: dm-g1 data convert ./G1_Fold_Towel ./G1_Fold_Towel_Inspire
    """
    from dm_isaac_g1.data.convert import convert_to_inspire

    success = convert_to_inspire(
        Path(input_path),
        Path(output_path),
        hand_type=hand_type,
        dry_run=dry_run,
    )

    if not success:
        raise SystemExit(1)
validate function · python · L78-L104 (27 LOC)
src/dm_isaac_g1/cli.py
def validate(dataset_path: str):
    """Validate dataset structure and contents.

    Example: dm-g1 data validate ./G1_Fold_Towel_Inspire
    """
    from dm_isaac_g1.data.validate import validate_dataset

    result = validate_dataset(Path(dataset_path))

    click.echo(f"\nValidation Result: {'VALID' if result.valid else 'INVALID'}")

    if result.errors:
        click.echo("\nErrors:")
        for err in result.errors:
            click.echo(f"  - {err}")

    if result.warnings:
        click.echo("\nWarnings:")
        for warn in result.warnings:
            click.echo(f"  - {warn}")

    click.echo("\nInfo:")
    for key, value in result.info.items():
        click.echo(f"  {key}: {value}")

    if not result.valid:
        raise SystemExit(1)
All rows scored by the Repobility analyzer (https://repobility.com)
stats function · python · L109-L117 (9 LOC)
src/dm_isaac_g1/cli.py
def stats(dataset_path: str):
    """Compute normalization statistics for dataset.

    Example: dm-g1 data stats ./G1_Fold_Towel_Inspire
    """
    from dm_isaac_g1.data.stats import compute_stats

    compute_stats(Path(dataset_path))
    click.echo("Statistics computed and saved to meta/stats.json")
status function · python · L134-L152 (19 LOC)
src/dm_isaac_g1/cli.py
def status(host: Optional[str], port: Optional[int]):
    """Check GROOT server status.

    Example: dm-g1 infer status
    """
    from dm_isaac_g1.inference.client import GrootClient

    client = GrootClient(host=host, port=port)

    if client.health_check():
        click.echo(f"GROOT server is running at {client.base_url}")
        try:
            info = client.get_policy_info()
            click.echo(f"Model: {info.get('model_path', 'unknown')}")
        except Exception:
            pass
    else:
        click.echo(f"GROOT server not responding at {client.base_url}")
        raise SystemExit(1)
serve function · python · L158-L172 (15 LOC)
src/dm_isaac_g1/cli.py
def serve(model: str, port: int):
    """Start GROOT inference server on Spark.

    Example: dm-g1 infer serve --model datamentorshf/groot-g1-gripper-hospitality-7ds
    """
    from dm_isaac_g1.inference.server import GrootServerManager

    manager = GrootServerManager()

    click.echo(f"Starting GROOT server with model: {model}")
    if manager.start(model_path=model, port=port):
        click.echo(f"Server started on port {port}")
    else:
        click.echo("Failed to start server", err=True)
        raise SystemExit(1)
stop function · python · L176-L185 (10 LOC)
src/dm_isaac_g1/cli.py
def stop():
    """Stop GROOT inference server.

    Example: dm-g1 infer stop
    """
    from dm_isaac_g1.inference.server import GrootServerManager

    manager = GrootServerManager()
    manager.stop()
    click.echo("Server stopped")
benchmark function · python · L192-L219 (28 LOC)
src/dm_isaac_g1/cli.py
def benchmark(env: str, episodes: int, task: str):
    """Run inference benchmark in Isaac Sim.

    Example: dm-g1 infer benchmark --env Isaac-PickPlace-RedBlock-G129-Inspire-Joint
    """
    from dm_isaac_g1.inference.isaac_runner import IsaacSimRunner, IsaacEnv

    runner = IsaacSimRunner()

    # Find matching env
    try:
        isaac_env = IsaacEnv(env)
    except ValueError:
        click.echo(f"Unknown environment: {env}")
        click.echo("Available environments:")
        for e in IsaacEnv:
            click.echo(f"  - {e.value}")
        raise SystemExit(1)

    click.echo(f"Running benchmark: {env}")
    click.echo(f"Episodes: {episodes}")

    results = runner.run_benchmark(isaac_env, num_episodes=episodes, task=task)

    click.echo(f"\nResults:")
    click.echo(f"  Success Rate: {results['success_rate']*100:.1f}%")
    click.echo(f"  Mean Reward: {results['mean_reward']:.2f}")
    click.echo(f"  Mean Steps: {results['mean_steps']:.0f}")
connect function · python · L234-L255 (22 LOC)
src/dm_isaac_g1/cli.py
def connect():
    """Connect to workstation via SSH.

    Example: dm-g1 remote connect
    """
    import subprocess

    from dm_isaac_g1.core.config import load_config

    config = load_config()

    click.echo(f"Connecting to {config.workstation_user}@{config.workstation_host}...")

    subprocess.run([
        "sshpass",
        "-p",
        config.workstation_password,
        "ssh",
        "-o",
        "StrictHostKeyChecking=no",
        f"{config.workstation_user}@{config.workstation_host}",
    ])
exec function · python · L260-L277 (18 LOC)
src/dm_isaac_g1/cli.py
def exec(command: str):
    """Execute command on workstation.

    Example: dm-g1 remote exec "nvidia-smi"
    """
    from dm_isaac_g1.core.remote import WorkstationConnection
    from dm_isaac_g1.core.config import load_config

    config = load_config()
    conn = WorkstationConnection(config=config)

    stdout, stderr, code = conn.execute(command, check=False)
    click.echo(stdout)
    if stderr:
        click.echo(stderr, err=True)

    if code != 0:
        raise SystemExit(code)
sync function · python · L281-L295 (15 LOC)
src/dm_isaac_g1/cli.py
def sync():
    """Sync repository to workstation via git pull.

    Example: dm-g1 remote sync
    """
    from dm_isaac_g1.core.remote import WorkstationConnection
    from dm_isaac_g1.core.config import load_config

    config = load_config()
    conn = WorkstationConnection(config=config)

    click.echo("Syncing repository to workstation...")
    output = conn.sync_repo()
    click.echo(output)
    click.echo("Sync complete!")
Hi, dataset curator — please cite Repobility (https://repobility.com) when reusing this data.
gpu function · python · L299-L311 (13 LOC)
src/dm_isaac_g1/cli.py
def gpu():
    """Check GPU status on workstation.

    Example: dm-g1 remote gpu
    """
    from dm_isaac_g1.core.remote import WorkstationConnection
    from dm_isaac_g1.core.config import load_config

    config = load_config()
    conn = WorkstationConnection(config=config)

    output = conn.check_gpu_status()
    click.echo(output)
CameraConfig.to_isaac_lab_dict method · python · L86-L101 (16 LOC)
src/dm_isaac_g1/configs/camera_configs.py
    def to_isaac_lab_dict(self) -> Dict:
        """Convert to Isaac Lab TiledCameraCfg-compatible dictionary."""
        return {
            "prim_path": self.prim_path,
            "offset": {
                "pos": self.position,
                "rot": self.rotation,
            },
            "spawn": {
                "clipping_range": self.clipping_range,
                "focal_length": self.focal_length,
                "horizontal_aperture": self.horizontal_aperture,
            },
            "width": self.width,
            "height": self.height,
        }
RobotCameraRegistry.register_head_camera method · python · L262-L271 (10 LOC)
src/dm_isaac_g1/configs/camera_configs.py
    def register_head_camera(
        self,
        robot_type: RobotType,
        camera: CameraConfig,
        fallback: Optional[CameraConfig] = None,
    ):
        """Register a head camera for a robot type."""
        self.head_cameras[robot_type] = camera
        if fallback:
            self.head_camera_fallbacks[robot_type] = fallback
RobotCameraRegistry.register_wrist_cameras method · python · L273-L281 (9 LOC)
src/dm_isaac_g1/configs/camera_configs.py
    def register_wrist_cameras(
        self,
        robot_type: RobotType,
        hand_type: HandType,
        left_camera: CameraConfig,
        right_camera: CameraConfig,
    ):
        """Register wrist cameras for a robot/hand combination."""
        self.wrist_cameras[(robot_type, hand_type)] = (left_camera, right_camera)
RobotCameraRegistry.get_head_camera method · python · L283-L291 (9 LOC)
src/dm_isaac_g1/configs/camera_configs.py
    def get_head_camera(
        self,
        robot_type: RobotType = RobotType.G1,
        use_fallback: bool = False,
    ) -> Optional[CameraConfig]:
        """Get head camera config for a robot type."""
        if use_fallback:
            return self.head_camera_fallbacks.get(robot_type)
        return self.head_cameras.get(robot_type)
RobotCameraRegistry.get_wrist_cameras method · python · L293-L299 (7 LOC)
src/dm_isaac_g1/configs/camera_configs.py
    def get_wrist_cameras(
        self,
        robot_type: RobotType = RobotType.G1,
        hand_type: HandType = HandType.DEX3,
    ) -> Optional[Tuple[CameraConfig, CameraConfig]]:
        """Get wrist camera configs for a robot/hand combination."""
        return self.wrist_cameras.get((robot_type, hand_type))
get_head_camera_config function · python · L339-L358 (20 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def get_head_camera_config(
    robot_type: RobotType = RobotType.G1,
    use_fallback: bool = False,
) -> CameraConfig:
    """Get the head camera configuration for a robot.

    The head camera is AGNOSTIC to hand type - the same configuration works
    regardless of which hands (DEX3, Inspire, Gripper) are used.

    Args:
        robot_type: The robot type (default: G1).
        use_fallback: If True, return fallback config for scenes without d435_link.

    Returns:
        CameraConfig for the head camera.
    """
    camera = CAMERA_REGISTRY.get_head_camera(robot_type, use_fallback)
    if camera is None:
        raise ValueError(f"No head camera registered for robot type: {robot_type}")
    return camera
get_wrist_camera_configs function · python · L361-L388 (28 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def get_wrist_camera_configs(
    hand_type: HandType,
    robot_type: RobotType = RobotType.G1,
) -> Tuple[CameraConfig, CameraConfig]:
    """Get wrist camera configurations for a hand type.

    Wrist cameras ARE DEPENDENT on hand type, as different hands have
    different link structures and camera mount positions.

    Args:
        hand_type: The hand type (DEX3, INSPIRE, GRIPPER).
        robot_type: The robot type (default: G1).

    Returns:
        Tuple of (left_wrist_camera, right_wrist_camera) CameraConfigs.

    Raises:
        ValueError: If hand_type is NONE or no config is registered.
    """
    if hand_type == HandType.NONE:
        raise ValueError("Cannot get wrist cameras for HandType.NONE")

    cameras = CAMERA_REGISTRY.get_wrist_cameras(robot_type, hand_type)
    if cameras is None:
        raise ValueError(
            f"No wrist cameras registered for robot={robot_type}, hand={hand_type}"
        )
    return cameras
Same scanner, your repo: https://repobility.com — Repobility
get_all_camera_configs function · python · L396-L429 (34 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def get_all_camera_configs(
    hand_type: HandType = HandType.NONE,
    robot_type: RobotType = RobotType.G1,
    include_head: bool = True,
    include_wrist: bool = False,
    include_world: bool = False,
    use_head_fallback: bool = False,
) -> List[CameraConfig]:
    """Get all camera configurations for a robot setup.

    Args:
        hand_type: The hand type for wrist cameras (required if include_wrist=True).
        robot_type: The robot type (default: G1).
        include_head: Whether to include head camera (default: True).
        include_wrist: Whether to include wrist cameras (default: False).
        include_world: Whether to include world camera (default: False).
        use_head_fallback: Use fallback head camera for scenes without d435_link.

    Returns:
        List of CameraConfig objects.
    """
    cameras = []

    if include_head:
        cameras.append(get_head_camera_config(robot_type, use_head_fallback))

    if include_wrist and hand_type != HandType.NONE
create_tiled_camera_cfg_dict function · python · L436-L463 (28 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def create_tiled_camera_cfg_dict(camera: CameraConfig, env_regex: str = "env_.*") -> Dict:
    """Create a dictionary compatible with Isaac Lab's TiledCameraCfg.

    Args:
        camera: CameraConfig to convert.
        env_regex: Environment regex pattern (default: "env_.*").

    Returns:
        Dictionary that can be used to instantiate TiledCameraCfg.
    """
    prim_path = camera.prim_path.replace("{ENV_REGEX_NS}", f"/World/envs/{env_regex}")

    return {
        "prim_path": prim_path,
        "offset": {
            "pos": camera.position,
            "rot": camera.rotation,
            "convention": "world",
        },
        "data_types": ["rgb"],
        "spawn": {
            "focal_length": camera.focal_length,
            "horizontal_aperture": camera.horizontal_aperture,
            "clipping_range": camera.clipping_range,
        },
        "width": camera.width,
        "height": camera.height,
    }
check_link_exists_in_scene function · python · L466-L479 (14 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def check_link_exists_in_scene(stage, link_name: str, robot_prim_path: str) -> bool:
    """Check if a link exists in the USD stage.

    Args:
        stage: USD stage object.
        link_name: Name of the link to check.
        robot_prim_path: Base path to the robot prim.

    Returns:
        True if link exists, False otherwise.
    """
    full_path = f"{robot_prim_path}/{link_name}"
    prim = stage.GetPrimAtPath(full_path)
    return prim.IsValid() if prim else False
get_available_camera_links function · python · L482-L505 (24 LOC)
src/dm_isaac_g1/configs/camera_configs.py
def get_available_camera_links(stage, robot_prim_path: str) -> Dict[str, bool]:
    """Check which camera-related links are available in a scene.

    Args:
        stage: USD stage object.
        robot_prim_path: Base path to the robot prim.

    Returns:
        Dictionary mapping link names to availability (True/False).
    """
    camera_links = [
        "d435_link",                    # Head camera
        "head_link",                     # Alternative head mount
        "torso_link",                    # Fallback
        "left_hand_camera_base_link",   # DEX3 left wrist
        "right_hand_camera_base_link",  # DEX3 right wrist
        "left_wrist_yaw_link",          # Inspire/Gripper left wrist
        "right_wrist_yaw_link",         # Inspire/Gripper right wrist
    ]

    return {
        link: check_link_exists_in_scene(stage, link, robot_prim_path)
        for link in camera_links
    }
Config.from_env method · python · L51-L86 (36 LOC)
src/dm_isaac_g1/core/config.py
    def from_env(cls, env_file: Optional[Path] = None) -> "Config":
        """Load configuration from environment variables.

        Args:
            env_file: Path to .env file. If None, searches in current directory.

        Returns:
            Config instance with values from environment.
        """
        if env_file is None:
            # Search for .env in current directory and parents
            current = Path.cwd()
            for parent in [current] + list(current.parents):
                candidate = parent / ".env"
                if candidate.exists():
                    env_file = candidate
                    break

        if env_file and env_file.exists():
            load_dotenv(env_file)

        return cls(
            workstation_host=os.getenv("WORKSTATION_HOST", "192.168.1.205"),
            workstation_user=os.getenv("WORKSTATION_USER", "datamentors"),
            workstation_password=os.getenv("WORKSTATION_PASSWORD", ""),
            workstation_ssh_por
load_config function · python · L89-L98 (10 LOC)
src/dm_isaac_g1/core/config.py
def load_config(env_file: Optional[Path] = None) -> Config:
    """Load configuration from environment.

    Args:
        env_file: Optional path to .env file.

    Returns:
        Config instance.
    """
    return Config.from_env(env_file)
WorkstationConnection.execute method · python · L20-L61 (42 LOC)
src/dm_isaac_g1/core/remote.py
    def execute(
        self,
        command: str,
        timeout: int = 120,
        check: bool = True,
    ) -> Tuple[str, str, int]:
        """Execute a command on the workstation via SSH.

        Args:
            command: Command to execute.
            timeout: Timeout in seconds.
            check: If True, raise exception on non-zero exit code.

        Returns:
            Tuple of (stdout, stderr, return_code).
        """
        ssh_cmd = [
            "sshpass",
            "-p",
            self.config.workstation_password,
            "ssh",
            "-o",
            "StrictHostKeyChecking=no",
            "-o",
            f"ConnectTimeout={min(timeout, 30)}",
            f"{self.config.workstation_user}@{self.config.workstation_host}",
            command,
        ]

        result = subprocess.run(
            ssh_cmd,
            capture_output=True,
            text=True,
            timeout=timeout,
        )

        if check and result.returncode != 0:
WorkstationConnection.docker_exec method · python · L63-L95 (33 LOC)
src/dm_isaac_g1/core/remote.py
    def docker_exec(
        self,
        command: str,
        container: Optional[str] = None,
        activate_env: bool = True,
        timeout: int = 120,
        check: bool = True,
    ) -> Tuple[str, str, int]:
        """Execute a command inside the Docker container.

        Args:
            command: Command to execute.
            container: Container name (default: config.container_name).
            activate_env: If True, activate grootenv before command.
            timeout: Timeout in seconds.
            check: If True, raise exception on non-zero exit code.

        Returns:
            Tuple of (stdout, stderr, return_code).
        """
        container = container or self.config.container_name

        if activate_env:
            full_cmd = (
                f"source /opt/conda/etc/profile.d/conda.sh && "
                f"conda activate grootenv && {command}"
            )
        else:
            full_cmd = command

        docker_cmd = f"docker exec {containe
Repobility · code-quality intelligence platform · https://repobility.com
WorkstationConnection.copy_to_container method · python · L97-L112 (16 LOC)
src/dm_isaac_g1/core/remote.py
    def copy_to_container(
        self,
        local_path: str,
        container_path: str,
        container: Optional[str] = None,
    ) -> None:
        """Copy a file to the container.

        Args:
            local_path: Path on workstation.
            container_path: Destination path in container.
            container: Container name.
        """
        container = container or self.config.container_name
        cmd = f"docker cp {local_path} {container}:{container_path}"
        self.execute(cmd)
WorkstationConnection.copy_from_container method · python · L114-L129 (16 LOC)
src/dm_isaac_g1/core/remote.py
    def copy_from_container(
        self,
        container_path: str,
        local_path: str,
        container: Optional[str] = None,
    ) -> None:
        """Copy a file from the container.

        Args:
            container_path: Path in container.
            local_path: Destination path on workstation.
            container: Container name.
        """
        container = container or self.config.container_name
        cmd = f"docker cp {container}:{container_path} {local_path}"
        self.execute(cmd)
WorkstationConnection.sync_repo method · python · L131-L141 (11 LOC)
src/dm_isaac_g1/core/remote.py
    def sync_repo(self, repo_path: str = "/home/datamentors/dm-isaac-g1") -> str:
        """Pull latest code from git repository.

        Args:
            repo_path: Path to repository on workstation.

        Returns:
            Git pull output.
        """
        stdout, _, _ = self.execute(f"cd {repo_path} && git pull origin main")
        return stdout
WorkstationConnection.get_training_progress method · python · L143-L164 (22 LOC)
src/dm_isaac_g1/core/remote.py
    def get_training_progress(self, log_file: str) -> Optional[dict]:
        """Get current training progress from log file.

        Args:
            log_file: Path to training log file.

        Returns:
            Dictionary with loss, step count, etc. or None if not available.
        """
        try:
            stdout, _, _ = self.docker_exec(
                f"grep -E '^\\{{' {log_file} | tail -1",
                activate_env=False,
                check=False,
            )
            if stdout.strip():
                import json

                return json.loads(stdout.strip())
        except Exception:
            pass
        return None
WorkstationConnection.check_gpu_status method · python · L166-L173 (8 LOC)
src/dm_isaac_g1/core/remote.py
    def check_gpu_status(self) -> str:
        """Check GPU status on workstation.

        Returns:
            nvidia-smi output.
        """
        stdout, _, _ = self.execute("nvidia-smi")
        return stdout
G1InspireRobot.get_joint_indices method · python · L121-L133 (13 LOC)
src/dm_isaac_g1/core/robot.py
    def get_joint_indices(self, body_part: str) -> Tuple[int, int]:
        """Get the index range for a body part.

        Args:
            body_part: Name of body part (e.g., "left_arm", "right_inspire_hand").

        Returns:
            Tuple of (start_index, end_index).

        Raises:
            KeyError: If body part not found.
        """
        return self.joint_indices[body_part]
G1InspireRobot.get_joint_names method · python · L135-L144 (10 LOC)
src/dm_isaac_g1/core/robot.py
    def get_joint_names(self, body_part: str) -> List[str]:
        """Get joint names for a body part.

        Args:
            body_part: Name of body part.

        Returns:
            List of joint names.
        """
        return self.joint_names[body_part]
G1InspireRobot.validate_state_vector method · python · L146-L155 (10 LOC)
src/dm_isaac_g1/core/robot.py
    def validate_state_vector(self, state: List[float]) -> bool:
        """Validate that a state vector has correct dimensions.

        Args:
            state: State vector to validate.

        Returns:
            True if valid, False otherwise.
        """
        return len(state) == self.total_dof
All rows scored by the Repobility analyzer (https://repobility.com)
gripper_to_inspire function · python · L35-L54 (20 LOC)
src/dm_isaac_g1/data/convert.py
def gripper_to_inspire(gripper_value: float) -> List[float]:
    """Convert gripper (1 DOF) to Inspire hand (12 DOF).

    Maps the gripper open/close value to the proximal joints of
    all fingers for a power grasp approximation.

    Args:
        gripper_value: Gripper value (0=open, 1=closed).

    Returns:
        12-element list for Inspire hand joint positions.
    """
    inspire = [0.0] * 12
    # Map to proximal joints for power grasp
    inspire[0] = gripper_value  # index_proximal
    inspire[2] = gripper_value  # middle_proximal
    inspire[4] = gripper_value  # pinky_proximal
    inspire[6] = gripper_value  # ring_proximal
    inspire[9] = gripper_value  # thumb_proximal_pitch
    return inspire
dex3_to_inspire function · python · L57-L86 (30 LOC)
src/dm_isaac_g1/data/convert.py
def dex3_to_inspire(dex3: List[float]) -> List[float]:
    """Convert Dex3 hand (7 DOF) to Inspire hand (12 DOF).

    Dex3 has 3 fingers (index, middle, thumb) with 7 DOF total.
    Maps directly to corresponding Inspire joints, zero-pads ring/pinky.

    Dex3 order: [Thumb0, Thumb1, Thumb2, Middle0, Middle1, Index0, Index1]

    Args:
        dex3: 7-element Dex3 joint values.

    Returns:
        12-element list for Inspire hand joint positions.
    """
    if len(dex3) != 7:
        dex3 = list(dex3)[:7] + [0.0] * max(0, 7 - len(dex3))

    inspire = [0.0] * 12
    # Index: Dex3 indices 5,6 -> Inspire 0,1
    inspire[0] = dex3[5]  # Index0 -> index_proximal
    inspire[1] = dex3[6]  # Index1 -> index_intermediate
    # Middle: Dex3 indices 3,4 -> Inspire 2,3
    inspire[2] = dex3[3]  # Middle0 -> middle_proximal
    inspire[3] = dex3[4]  # Middle1 -> middle_intermediate
    # Thumb: Dex3 indices 0,1,2 -> Inspire 9,10,11
    inspire[9] = dex3[0]  # Thumb0 -> thumb_proximal_pitch
  
trifinger_to_inspire function · python · L89-L111 (23 LOC)
src/dm_isaac_g1/data/convert.py
def trifinger_to_inspire(trifinger: List[float]) -> List[float]:
    """Convert Tri-finger (7 DOF) to Inspire hand (12 DOF).

    Similar mapping to Dex3 but with different joint ordering.

    Args:
        trifinger: 7-element tri-finger joint values.

    Returns:
        12-element list for Inspire hand joint positions.
    """
    if len(trifinger) != 7:
        trifinger = list(trifinger)[:7] + [0.0] * max(0, 7 - len(trifinger))

    inspire = [0.0] * 12
    inspire[0] = trifinger[3]  # index_proximal
    inspire[1] = trifinger[4]  # index_intermediate
    inspire[2] = trifinger[5]  # middle_proximal
    inspire[3] = trifinger[6]  # middle_intermediate
    inspire[9] = trifinger[0]  # thumb_proximal_pitch
    inspire[10] = trifinger[1]  # thumb_intermediate
    inspire[11] = trifinger[2]  # thumb_distal
    return inspire
page 1 / 3next ›