mirror of
https://github.com/SikongJueluo/ya-vla.git
synced 2025-12-20 06:27:49 +08:00
refector: use mujoco to replace genesis
This commit is contained in:
298
src/main.py
298
src/main.py
@@ -1,126 +1,194 @@
|
||||
from typing import cast
|
||||
"""
|
||||
Simple OpenVLA Demo with Robosuite + MuJoCo
|
||||
Uses the official OpenVLA model from HuggingFace
|
||||
"""
|
||||
|
||||
import genesis as gs
|
||||
import numpy as np
|
||||
|
||||
########################## init ##########################
|
||||
gs.init(backend=gs.gs_backend.gpu)
|
||||
|
||||
try:
|
||||
from genesis.engine.entities import RigidEntity
|
||||
except ImportError:
|
||||
raise ImportError("genesis.engine.entities.RigidEntity is not available")
|
||||
|
||||
########################## create a scene ##########################
|
||||
scene = gs.Scene(
|
||||
viewer_options=gs.options.ViewerOptions(
|
||||
camera_pos=(0, -3.5, 2.5),
|
||||
camera_lookat=(0.0, 0.0, 0.5),
|
||||
camera_fov=30,
|
||||
res=(960, 640),
|
||||
max_FPS=60,
|
||||
),
|
||||
sim_options=gs.options.SimOptions(
|
||||
dt=0.01,
|
||||
),
|
||||
show_viewer=True,
|
||||
)
|
||||
|
||||
########################## entities ##########################
|
||||
plane = scene.add_entity(
|
||||
gs.morphs.Plane(),
|
||||
)
|
||||
franka = scene.add_entity(
|
||||
gs.morphs.MJCF(
|
||||
file="xml/franka_emika_panda/panda.xml",
|
||||
),
|
||||
)
|
||||
franka = cast(RigidEntity, franka)
|
||||
import robosuite as suite
|
||||
import torch
|
||||
from PIL import Image
|
||||
from transformers import AutoModelForVision2Seq, AutoProcessor
|
||||
|
||||
|
||||
########################## build ##########################
|
||||
scene.build()
|
||||
class OpenVLADemo:
|
||||
"""OpenVLA Demo with Robosuite using HuggingFace model"""
|
||||
|
||||
jnt_names = [
|
||||
"joint1",
|
||||
"joint2",
|
||||
"joint3",
|
||||
"joint4",
|
||||
"joint5",
|
||||
"joint6",
|
||||
"joint7",
|
||||
"finger_joint1",
|
||||
"finger_joint2",
|
||||
]
|
||||
dofs_idx = [franka.get_joint(name).dof_idx_local for name in jnt_names]
|
||||
# OpenVLA action names (7-DoF)
|
||||
ACTION_NAMES = [
|
||||
"delta_x",
|
||||
"delta_y",
|
||||
"delta_z",
|
||||
"delta_roll",
|
||||
"delta_pitch",
|
||||
"delta_yaw",
|
||||
"gripper",
|
||||
]
|
||||
|
||||
############ Optional: set control gains ############
|
||||
# set positional gains
|
||||
franka.set_dofs_kp(
|
||||
kp=np.array([4500, 4500, 3500, 3500, 2000, 2000, 2000, 100, 100]),
|
||||
dofs_idx_local=dofs_idx,
|
||||
)
|
||||
# set velocity gains
|
||||
franka.set_dofs_kv(
|
||||
kv=np.array([450, 450, 350, 350, 200, 200, 200, 10, 10]),
|
||||
dofs_idx_local=dofs_idx,
|
||||
)
|
||||
# set force range for safety
|
||||
franka.set_dofs_force_range(
|
||||
lower=np.array([-87, -87, -87, -87, -12, -12, -12, -100, -100]),
|
||||
upper=np.array([87, 87, 87, 87, 12, 12, 12, 100, 100]),
|
||||
dofs_idx_local=dofs_idx,
|
||||
)
|
||||
# Hard reset
|
||||
for i in range(150):
|
||||
if i < 50:
|
||||
franka.set_dofs_position(np.array([1, 1, 0, 0, 0, 0, 0, 0.04, 0.04]), dofs_idx)
|
||||
elif i < 100:
|
||||
franka.set_dofs_position(
|
||||
np.array([-1, 0.8, 1, -2, 1, 0.5, -0.5, 0.04, 0.04]), dofs_idx
|
||||
def __init__(
|
||||
self,
|
||||
env_name: str = "Lift",
|
||||
robot: str = "Panda",
|
||||
instruction: str = "pick up the red cube",
|
||||
model_name: str = "openvla/openvla-7b",
|
||||
):
|
||||
self.instruction = instruction
|
||||
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
|
||||
print(f"[INFO] Using device: {self.device}")
|
||||
|
||||
# Load OpenVLA model from HuggingFace
|
||||
print(f"[INFO] Loading OpenVLA model: {model_name}")
|
||||
print("[INFO] This may take a while on first run...")
|
||||
|
||||
self.processor = AutoProcessor.from_pretrained(
|
||||
model_name, trust_remote_code=True
|
||||
)
|
||||
else:
|
||||
franka.set_dofs_position(np.array([0, 0, 0, 0, 0, 0, 0, 0, 0]), dofs_idx)
|
||||
|
||||
scene.step()
|
||||
# Load model with appropriate settings
|
||||
self.model = AutoModelForVision2Seq.from_pretrained(
|
||||
model_name,
|
||||
torch_dtype=torch.bfloat16,
|
||||
low_cpu_mem_usage=True,
|
||||
trust_remote_code=True,
|
||||
).to(self.device)
|
||||
|
||||
# PD control
|
||||
for i in range(1250):
|
||||
if i == 0:
|
||||
franka.control_dofs_position(
|
||||
np.array([1, 1, 0, 0, 0, 0, 0, 0.04, 0.04]),
|
||||
dofs_idx,
|
||||
)
|
||||
elif i == 250:
|
||||
franka.control_dofs_position(
|
||||
np.array([-1, 0.8, 1, -2, 1, 0.5, -0.5, 0.04, 0.04]),
|
||||
dofs_idx,
|
||||
)
|
||||
elif i == 500:
|
||||
franka.control_dofs_position(
|
||||
np.array([0, 0, 0, 0, 0, 0, 0, 0, 0]),
|
||||
dofs_idx,
|
||||
)
|
||||
elif i == 750:
|
||||
# control first dof with velocity, and the rest with position
|
||||
franka.control_dofs_position(
|
||||
np.array([0, 0, 0, 0, 0, 0, 0, 0, 0])[1:],
|
||||
dofs_idx[1:],
|
||||
)
|
||||
franka.control_dofs_velocity(
|
||||
np.array([1.0, 0, 0, 0, 0, 0, 0, 0, 0])[:1],
|
||||
dofs_idx[:1],
|
||||
)
|
||||
elif i == 1000:
|
||||
franka.control_dofs_force(
|
||||
np.array([0, 0, 0, 0, 0, 0, 0, 0, 0]),
|
||||
dofs_idx,
|
||||
)
|
||||
# This is the control force computed based on the given control command
|
||||
# If using force control, it's the same as the given control command
|
||||
print("control force:", franka.get_dofs_control_force(dofs_idx))
|
||||
self.model.eval()
|
||||
print("[INFO] OpenVLA model loaded successfully!")
|
||||
|
||||
# This is the actual force experienced by the dof
|
||||
print("internal force:", franka.get_dofs_force(dofs_idx))
|
||||
# Create robosuite environment
|
||||
print(f"[INFO] Creating {env_name} environment with {robot} robot...")
|
||||
self.env = suite.make(
|
||||
env_name=env_name,
|
||||
robots=robot,
|
||||
has_renderer=True,
|
||||
has_offscreen_renderer=True,
|
||||
use_camera_obs=True,
|
||||
camera_names="agentview",
|
||||
camera_heights=224,
|
||||
camera_widths=224,
|
||||
control_freq=20,
|
||||
render_camera="frontview",
|
||||
)
|
||||
|
||||
scene.step()
|
||||
print(f"[INFO] Environment created! Action dim: {self.env.action_dim}")
|
||||
|
||||
def get_prompt(self) -> str:
|
||||
"""Format the prompt for OpenVLA"""
|
||||
return f"In: What action should the robot take to {self.instruction}?\nOut:"
|
||||
|
||||
@torch.no_grad()
|
||||
def get_action(self, obs: dict) -> np.ndarray:
|
||||
"""Get action from OpenVLA model"""
|
||||
# Get image from observation and convert to PIL
|
||||
image_array = obs["agentview_image"]
|
||||
# Robosuite returns (H, W, C) uint8 image
|
||||
image = Image.fromarray(image_array)
|
||||
|
||||
# Prepare prompt
|
||||
prompt = self.get_prompt()
|
||||
|
||||
# Process inputs
|
||||
inputs = self.processor(prompt, image).to(self.device, dtype=torch.bfloat16)
|
||||
|
||||
# Predict action using OpenVLA
|
||||
# Note: unnorm_key should match your robot setup
|
||||
# For simulation, we'll use raw normalized actions
|
||||
action = self.model.predict_action(**inputs, do_sample=False)
|
||||
|
||||
# Action is numpy array of shape (7,)
|
||||
return action
|
||||
|
||||
def print_vla_output(self, step: int, action: np.ndarray, reward: float):
|
||||
"""Print detailed VLA output to terminal"""
|
||||
print(f"\n[Step {step:6d}] OpenVLA Output:")
|
||||
print(f' Instruction: "{self.instruction}"')
|
||||
print(" Actions:")
|
||||
for name, value in zip(self.ACTION_NAMES, action):
|
||||
print(f" {name:12s}: {value:+.4f}")
|
||||
print(f" Step Reward: {reward:.4f}")
|
||||
|
||||
def run(self):
|
||||
"""Run the demo forever"""
|
||||
print("\n" + "=" * 60)
|
||||
print("OpenVLA Demo Started (Running Forever)")
|
||||
print(f'Instruction: "{self.instruction}"')
|
||||
print("Close the MuJoCo viewer window to quit")
|
||||
print("=" * 60 + "\n")
|
||||
|
||||
episode = 0
|
||||
total_steps = 0
|
||||
|
||||
# Run forever
|
||||
while True:
|
||||
episode += 1
|
||||
print(f"\n{'=' * 60}")
|
||||
print(f"Episode {episode} Started")
|
||||
print("=" * 60)
|
||||
|
||||
obs = self.env.reset()
|
||||
episode_reward = 0.0
|
||||
episode_steps = 0
|
||||
|
||||
while True:
|
||||
# Render environment with MuJoCo viewer
|
||||
self.env.render()
|
||||
|
||||
# Get action from OpenVLA model
|
||||
action = self.get_action(obs)
|
||||
|
||||
# Robosuite expects action of env.action_dim size
|
||||
# OpenVLA outputs 7-DoF, pad or truncate if needed
|
||||
if len(action) < self.env.action_dim:
|
||||
action = np.concatenate(
|
||||
[action, np.zeros(self.env.action_dim - len(action))]
|
||||
)
|
||||
elif len(action) > self.env.action_dim:
|
||||
action = action[: self.env.action_dim]
|
||||
|
||||
# Step environment
|
||||
obs, reward, done, info = self.env.step(action)
|
||||
episode_reward += reward
|
||||
episode_steps += 1
|
||||
total_steps += 1
|
||||
|
||||
# Print VLA output every 20 steps
|
||||
if episode_steps % 20 == 0:
|
||||
self.print_vla_output(total_steps, action, reward)
|
||||
|
||||
if done:
|
||||
print(f"\n[INFO] Episode {episode} done at step {episode_steps}")
|
||||
print(f"[INFO] Episode Reward: {episode_reward:.3f}")
|
||||
break
|
||||
|
||||
print(f"\nEpisode {episode} Summary:")
|
||||
print(f" Steps: {episode_steps}")
|
||||
print(f" Total Reward: {episode_reward:.3f}")
|
||||
print(f" Total Steps (all episodes): {total_steps}")
|
||||
|
||||
def close(self):
|
||||
"""Clean up"""
|
||||
self.env.close()
|
||||
|
||||
|
||||
def main():
|
||||
print("=" * 60)
|
||||
print("OpenVLA Demo with Robosuite + MuJoCo")
|
||||
print("Using OpenVLA-7B from HuggingFace")
|
||||
print("=" * 60)
|
||||
|
||||
demo = OpenVLADemo(
|
||||
env_name="Lift",
|
||||
robot="Panda",
|
||||
instruction="pick up the red cube",
|
||||
)
|
||||
|
||||
try:
|
||||
demo.run()
|
||||
except KeyboardInterrupt:
|
||||
print("\n[INFO] Interrupted by user")
|
||||
finally:
|
||||
demo.close()
|
||||
print("[INFO] Demo closed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user