diff --git a/pufferlib/config/ocean/drive.ini b/pufferlib/config/ocean/drive.ini index c5a68e0f9..e68a53185 100644 --- a/pufferlib/config/ocean/drive.ini +++ b/pufferlib/config/ocean/drive.ini @@ -74,6 +74,11 @@ spawn_length_min = 2.0 spawn_length_max = 5.5 spawn_height = 1.5 +; Render mode options: +; 0:"window" = pop-up raylib window (original) +; 1:"headless" = off-screen; frames piped to ffmpeg (recommended for training) +render_mode = 1 + turn_off_normalization = 1 ; Options: 0 - Use normalized reward coefs as input to NN, 1 - Dont @@ -171,28 +176,35 @@ vf_clip_coef = 0.1999999999999999 vf_coef = 2 vtrace_c_clip = 1 vtrace_rho_clip = 1 -checkpoint_interval = 250 -; Rendering options -render = True -render_async = True # Render in background process to avoid blocking training -render_interval = 1000 -; If True, show exactly what the agent sees in agent observation -obs_only = True -; Show grid lines -show_grid = True -; Draws lines from ego agent observed ORUs and road elements to show detection range -show_lasers = False -; Display human xy logs in the background -show_human_logs = False -; If True, zoom in on a part of the map. Otherwise, show full map -zoom_in = True -; Options: List[str to path], str to path (e.g., "resources/drive/training/binaries/map_001.bin"), None -render_map = none +checkpoint_interval = 1000 [eval] eval_interval = 1000 -; Path to dataset used for evaluation -map_dir = "resources/drive/binaries/training" +; Map directory for self-play evaluation (Carla maps) +sp_map_dir = "resources/drive/binaries/carla_2D" +; Map directory for human replay evaluation (WOMD training scenarios) +hr_map_dir = "resources/drive/binaries/training" + +; Number of agents for self-play evaluation +num_eval_agents = 64 +; Number of agents for human replay evaluation (one SDC per scenario) +human_replay_num_agents = 16 +; If True, enable self-play evaluation (pair policy-controlled agent with a copy of itself) +self_play_eval = True +; If True, enable human replay evaluation (pair policy-controlled agent with human replays) +human_replay_eval = False +; Control mode for human replay: "control_sdc_only" controls only the SDC; others replay logged trajectories +human_replay_control_mode = "control_sdc_only" +; Which env to render during eval. Options: "first" (by index), "worst_collision", "random" +render_select_mode = "first" +; View mode(s) for eval rendering. Options: sim_state, bev, persp, both (sim_state+persp), all (sim_state+persp+bev) +; Multi-view options run a separate rollout per view, each producing its own wandb video under render/{mode}/{view} +render_view_mode = "all" +; If True, render random scenarios. Note: Doing this frequently will slow down the training. +render_human_replay_eval = False +render_self_play_eval = True + + ; Number of scenarios to process per batch wosac_batch_size = 32 ; Target number of unique scenarios perform evaluation in @@ -222,18 +234,13 @@ wosac_sanity_check = False wosac_aggregate_results = True ; Evaluation mode: "policy", "ground_truth" wosac_eval_mode = "policy" -; If True, enable human replay evaluation (pair policy-controlled agent with human replays) -human_replay_eval = False -; Control only the self-driving car -human_replay_control_mode = "control_sdc_only" -; Number of scenarios for human replay evaluation equals the number of agents -human_replay_num_agents = 16 [safe_eval] ; If True, periodically run policy with safe/law-abiding reward conditioning and log metrics enabled = True ; How often to run safe eval (in training epochs). Defaults to render_interval. interval = 250 +render_safe_eval = True ; Number of agents to run in the eval environment num_agents = 50 ; Number of episodes to collect metrics over @@ -271,29 +278,23 @@ steer = 1.0 acc = 1.0 [render] -; Mode to render a bunch of maps with a given policy -; Path to dataset used for rendering -map_dir = "resources/drive/binaries/training" -; Directory to output rendered videos +; Render a batch of maps offline using the in-process c_render (ffmpeg) pipeline. +; Path to the .bin map directory +map_dir = "resources/drive/binaries/carla_2D" +; Directory to write output mp4 files output_dir = "resources/drive/render_videos" -; Evaluation will run on the first num_maps maps in the map_dir directory -num_maps = 100 -; "both", "topdown", "agent"; Other args are passed from train confs -view_mode = "both" -; Policy bin file used for rendering videos -policy_path = "resources/drive/puffer_drive_weights_resampling_300.bin" +; Number of maps to render (capped at files available in map_dir) +num_maps = 3 +; View mode: sim_state (top-down, origin-centered), zoom_out (top-down, full map bbox), bev (agent BEV obs), persp (third-person follow-cam) +; Multi-view: "both" = sim_state + persp, "all" = sim_state + persp + bev (runs a separate rollout per view) +; NOTE: "persp" and "bev" require active_agent_count > 0 +view_mode = "persp" +; Whether to draw agent trajectory traces +draw_traces = True ; Allows more than cpu cores workers for rendering overwork = True -; If True, show exactly what the agent sees in agent observation -obs_only = True -; Show grid lines -show_grid = True -; Draws lines from ego agent observed ORUs and road elements to show detection range -show_lasers = True ; Display human xy logs in the background show_human_logs = False -; If True, zoom in on a part of the map. Otherwise, show full map -zoom_in = True [sweep.train.learning_rate] distribution = log_normal diff --git a/pufferlib/ocean/benchmark/evaluator.py b/pufferlib/ocean/benchmark/evaluator.py index 15b37208e..55b0f5a3e 100644 --- a/pufferlib/ocean/benchmark/evaluator.py +++ b/pufferlib/ocean/benchmark/evaluator.py @@ -1,5 +1,6 @@ """WOSAC evaluation class for PufferDrive.""" +import copy import torch import numpy as np import pandas as pd @@ -818,6 +819,277 @@ def rollout(self, args, puffer_env, policy): return results +class Evaluator: + """Evaluates policies in self_play, human_replay, with optional rendering. + + Initializes the eval envs needed based on eval config flags: + - human_replay_eval: creates hr_env (control_sdc_only) + - self_play_eval: creates sp_env (control_agents) + """ + + RENDER_FIRST = "first" + RENDER_RANDOM = "random" + RENDER_WORST_SCORE = "worst_score" + RENDER_WORST_COLLISION = "worst_collision" + + def __init__(self, configs, logger=None): + self.configs = configs + self.logger = logger + self.self_play_stats = None + self.human_replay_stats = None + self.sp_env = None + self.hr_env = None + + self._unpack_eval_configs(configs) + + def _unpack_eval_configs(self, configs): + from pufferlib.ocean.drive.drive import RenderView + + _VIEW_MODE_MAP = { + "sim_state": [RenderView.FULL_SIM_STATE], + "topdown": [RenderView.FULL_SIM_STATE], + "bev": [RenderView.BEV_AGENT_OBS], + "agent": [RenderView.BEV_AGENT_OBS], + "persp": [RenderView.AGENT_PERSP], + "both": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP], + "all": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP, RenderView.BEV_AGENT_OBS], + } + _VIEW_SUFFIX = { + RenderView.FULL_SIM_STATE: "sim_state", + RenderView.AGENT_PERSP: "persp", + RenderView.BEV_AGENT_OBS: "bev", + } + + eval_config = copy.deepcopy(configs) + # Create separate evaluation environments based on specified configs + eval_config["env"]["termination_mode"] = 0 + backend = eval_config["eval"].get("backend", "PufferEnv") + eval_config["vec"] = dict(backend=backend, num_envs=1) + + self.render_sp_rollout = self.configs["eval"]["render_self_play_eval"] + self.render_hr_rollout = self.configs["eval"]["render_human_replay_eval"] + + view_mode_str = str(self.configs["eval"].get("render_view_mode", "sim_state")).lower().strip('"').strip("'") + self.render_view_modes = _VIEW_MODE_MAP.get(view_mode_str, [RenderView.FULL_SIM_STATE]) + self.render_view_suffix = _VIEW_SUFFIX + + # --- Human replay config --- + hr_control_mode = ( + str(self.configs["eval"].get("human_replay_control_mode", "control_sdc_only")).strip('"').strip("'") + ) + hr_num_agents = int(self.configs["eval"].get("human_replay_num_agents", 16)) + hr_map_dir = ( + str(self.configs["eval"].get("hr_map_dir", "resources/drive/binaries/training")).strip('"').strip("'") + ) + + self.hr_eval_config = copy.deepcopy(eval_config) + self.hr_eval_config["env"]["map_dir"] = hr_map_dir + self.hr_eval_config["env"]["num_agents"] = hr_num_agents + self.hr_eval_config["env"]["control_mode"] = hr_control_mode + self.hr_eval_config["env"]["init_mode"] = "create_all_valid" + self.hr_eval_config["env"]["render_mode"] = ( + 0 # primary env: stats only; render envs created per-view in rollout() + ) + if self.configs["eval"]["human_replay_eval"]: + self.hr_eval_config["env"]["episode_length"] = 91 # WOMD scenario length + + # --- Self-play config --- + sp_map_dir = ( + str(self.configs["eval"].get("sp_map_dir", "resources/drive/binaries/carla_2D")).strip('"').strip("'") + ) + sp_num_agents = int(self.configs["eval"].get("num_eval_agents", 64)) + + self.sp_eval_config = copy.deepcopy(eval_config) + self.sp_eval_config["env"]["map_dir"] = sp_map_dir + self.sp_eval_config["env"]["num_agents"] = sp_num_agents + self.sp_eval_config["env"]["control_mode"] = "control_agents" + self.sp_eval_config["env"]["render_mode"] = ( + 0 # primary env: stats only; render envs created per-view in rollout() + ) + + self.render_select_mode = self.configs["eval"]["render_select_mode"] + + def select_render_env(self, env_logs): + """Select which environment to render based on per-env rollout statistics. + Args: + env_logs: List of dicts, one per environment. Each dict contains + aggregated agent statistics (score, collision_rate, offroad_rate, etc.) + with 'n' being the number of controlled agents in that env. + Empty dicts indicate no data was collected for that env. + + Returns: + int: Index of the environment to render. + """ + mode = self.render_select_mode + if mode == self.RENDER_FIRST: + return 0 + if mode == self.RENDER_RANDOM: + return np.random.randint(len(env_logs)) + + populated = [(i, log) for i, log in enumerate(env_logs) if log] + + if not populated: + return 0 + + if mode == self.RENDER_WORST_SCORE: + return min(populated, key=lambda x: x[1].get("score", 1.0))[0] + elif mode == self.RENDER_WORST_COLLISION: + return max(populated, key=lambda x: x[1].get("collision_rate", 0.0))[0] + # Add other modes based on desiderata here + return 0 + + def rollout(self, policy, mode="self_play"): + """Roll out the given policy in the specified eval env and collect statistics. + + Stats are collected using the primary env (already loaded by pufferl.py). + If rendering is enabled, each view mode gets its own temporary env with + render_mode=1, so each view has its own ffmpeg pipe and uniquely named mp4. + """ + from pufferlib.pufferl import load_env + + if mode == "human_replay": + env = self.hr_env + eval_config = self.hr_eval_config + render_eval = self.render_hr_rollout + else: # self_play + env = self.sp_env + eval_config = self.sp_eval_config + render_eval = self.render_sp_rollout + driver = env.driver_env + + needs_stats_first = render_eval and self.render_select_mode not in (self.RENDER_FIRST, self.RENDER_RANDOM) + + if needs_stats_first: + env_logs = self._run_rollout(policy, env, per_env_logs=True) + render_env_idx = self.select_render_env(env_logs) + else: + render_env_idx = self.select_render_env([{}] * driver.num_envs) + + # Collect stats from the primary env (no rendering) + info_list = self._run_rollout(policy, env) + final_info = info_list[0] if info_list else {} + + # Render each view in its own temporary env so each gets its own ffmpeg pipe + # and uniquely named mp4 (e.g. {scenario_id}_bev.mp4). + if render_eval: + view_modes = self.render_view_modes + multi_view = len(view_modes) > 1 + for view_mode in view_modes: + suffix = f"_{self.render_view_suffix[view_mode]}" if multi_view else "" + render_cfg = copy.deepcopy(eval_config) + render_cfg["env"]["render_mode"] = 1 + render_env = load_env("puffer_drive", render_cfg) + try: + self._run_rollout( + policy, + render_env, + render_env_idx=render_env_idx, + view_mode=view_mode, + view_suffix=suffix, + ) + finally: + render_env.close() + + if mode == "self_play": + self.self_play_stats = final_info + self.self_play_stats["render_env_idx"] = render_env_idx + elif mode == "human_replay": + self.human_replay_stats = final_info + self.human_replay_stats["render_env_idx"] = render_env_idx + + def _run_rollout(self, policy, env, render_env_idx=None, per_env_logs=False, view_mode=None, view_suffix=""): + """Run a single rollout. If render_env_idx is not None, render that env. + + Thin wrapper around :func:`pufferlib.ocean.drive.rollout.rollout_loop`. + The shared helper owns the actual forward-sample-step-break loop; this + method just builds the RenderContext when rendering is requested. + """ + from pufferlib.ocean.drive.drive import RenderView + from pufferlib.ocean.drive.rollout import RenderContext, rollout_loop + + render_ctx = None + if render_env_idx is not None: + render_ctx = RenderContext( + view_mode=view_mode if view_mode is not None else RenderView.FULL_SIM_STATE, + env_id=render_env_idx, + video_suffix=view_suffix, + ) + + return rollout_loop( + policy=policy, + env=env, + device=self.configs["train"]["device"], + use_rnn=self.configs["train"]["use_rnn"], + render_ctx=render_ctx, + per_env_logs=per_env_logs, + ) + + def log_videos(self, eval_mode, epoch): + """Log all mp4s in local path to wandb after env close has flushed ffmpeg pipes.""" + import os + import glob + + if not (self.logger and hasattr(self.logger, "wandb") and self.logger.wandb): + # Still clean up even if not logging + for p in glob.glob("*.mp4"): + os.remove(p) + return + + import wandb + + video_files = glob.glob("*.mp4") + if not video_files: + print("Warning: no render videos found in local path") + return + + render_mode = self.render_select_mode + multi_view = len(self.render_view_modes) > 1 + _known_suffixes = {"_sim_state", "_persp", "_bev"} + + for p in video_files: + stem = os.path.splitext(os.path.basename(p))[0] + # Extract view suffix from filename if present (e.g. "abc123_bev" → view="bev") + view_tag = "" + if multi_view: + for s in _known_suffixes: + if stem.endswith(s): + view_tag = s[1:] # strip leading "_" + stem = stem[: -len(s)] + break + scenario_id = stem + caption = f"scene_{scenario_id}_epoch_{epoch}_select_{render_mode}" + wandb_key = f"render/{eval_mode}/{view_tag}" if view_tag else f"render/{eval_mode}" + self.logger.wandb.log({wandb_key: wandb.Video(p, format="mp4", caption=caption)}) + + # Clean up + for p in video_files: + os.remove(p) + + def log_stats(self): + if not (self.logger and hasattr(self.logger, "wandb") and self.logger.wandb): + return + + eval_stats = {} + + if self.human_replay_stats is not None: + if "collision_rate" in self.human_replay_stats: + eval_stats["eval/hr_collision_rate"] = self.human_replay_stats["collision_rate"] + if "score" in self.human_replay_stats: + eval_stats["eval/hr_score"] = self.human_replay_stats["score"] + if self.self_play_stats is not None: + if "collision_rate" in self.self_play_stats: + eval_stats["eval/sp_collision_rate"] = self.self_play_stats["collision_rate"] + if "score" in self.self_play_stats: + eval_stats["eval/sp_score"] = self.self_play_stats["score"] + if "n" in self.self_play_stats: + eval_stats["eval/num_agents"] = self.self_play_stats["n"] + + if not eval_stats: + return + + self.logger.wandb.log(eval_stats) + + class SafeEvaluator: """Evaluates policies with fixed safe/law-abiding reward conditioning. @@ -829,7 +1101,7 @@ class SafeEvaluator: args dict — only the env_name, safe_eval config, and device. """ - def __init__(self, env_name: str, safe_eval_config: Dict, device="cuda", logger=None): + def __init__(self, env_name: str, safe_eval_config: Dict, device="cuda", logger=None, full_config=None): self.env_name = env_name self.logger = logger self.safe_eval_config = safe_eval_config @@ -840,6 +1112,38 @@ def __init__(self, env_name: str, safe_eval_config: Dict, device="cuda", logger= device = f"cuda:{device}" self.device = device self.stats = None + self.render_safe_eval = safe_eval_config.get("render_safe_eval", False) + # Resolve view modes from [eval].render_view_mode + from pufferlib.ocean.drive.drive import RenderView + + _VIEW_MODE_MAP = { + "sim_state": [RenderView.FULL_SIM_STATE], + "topdown": [RenderView.FULL_SIM_STATE], + "bev": [RenderView.BEV_AGENT_OBS], + "agent": [RenderView.BEV_AGENT_OBS], + "persp": [RenderView.AGENT_PERSP], + "both": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP], + "all": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP, RenderView.BEV_AGENT_OBS], + } + self._view_suffix = { + RenderView.FULL_SIM_STATE: "sim_state", + RenderView.AGENT_PERSP: "persp", + RenderView.BEV_AGENT_OBS: "bev", + } + view_mode_str = "sim_state" + if full_config is not None: + view_mode_str = ( + str(full_config.get("eval", {}).get("render_view_mode", "sim_state")).lower().strip('"').strip("'") + ) + self.render_view_modes = _VIEW_MODE_MAP.get(view_mode_str, [RenderView.FULL_SIM_STATE]) + + # Authoritative RNN flag comes from `full_config`, which PuffeRL passes + # in flattened form for this evaluator. In this code path the flag + # lives at `full_config["use_rnn"]` (no `train` nesting) — it is set + # during config loading from `rnn_name`. We previously used + # hasattr(policy, "hidden_size") as a proxy, which is fragile because + # non-RNN policies can also expose hidden_size. + self.use_rnn = bool(full_config.get("use_rnn", False)) if full_config is not None else False def _build_eval_env_config(self): """Build env config with safe reward conditioning values applied. @@ -903,7 +1207,7 @@ def evaluate(self, vecenv, policy): policy.eval() num_agents = vecenv.observation_space.shape[0] - use_rnn = hasattr(policy, "hidden_size") + use_rnn = self.use_rnn ob, _ = vecenv.reset() state = {} @@ -952,6 +1256,76 @@ def evaluate(self, vecenv, policy): self.stats = {k: float(np.mean(v)) for k, v in all_stats.items() if len(v) > 0} return self.stats + def render(self, eval_config, policy): + """Run a single rollout with rendering enabled, one env per view mode. + + Always renders env index 0 (first env). Uses [eval].render_view_mode. + Produces .mp4 files on disk (flushed when each render_env is closed). + + Thin wrapper around :func:`pufferlib.ocean.drive.rollout.rollout_loop`. + """ + import copy + + from pufferlib.pufferl import load_env + from pufferlib.ocean.drive.rollout import RenderContext, rollout_loop + + multi_view = len(self.render_view_modes) > 1 + for view_mode in self.render_view_modes: + suffix = f"_{self._view_suffix[view_mode]}" if multi_view else "" + render_cfg = copy.deepcopy(eval_config) + render_cfg["env"]["render_mode"] = 1 + render_env = load_env(self.env_name, render_cfg) + try: + rollout_loop( + policy=policy, + env=render_env, + device=self.device, + use_rnn=self.use_rnn, + max_steps=self.episode_length, + render_ctx=RenderContext( + view_mode=view_mode, + env_id=0, + video_suffix=suffix, + ), + ) + finally: + render_env.close() + + def log_videos(self, epoch): + """Glob .mp4 files produced by render() and log them to wandb under render/safe_eval.""" + import os + import glob + + if not (self.logger and hasattr(self.logger, "wandb") and self.logger.wandb): + for p in glob.glob("*.mp4"): + os.remove(p) + return + + import wandb + + video_files = glob.glob("*.mp4") + if not video_files: + print("Warning: safe_eval render produced no mp4 files") + return + + multi_view = len(self.render_view_modes) > 1 + _known_suffixes = {"_sim_state", "_persp", "_bev"} + for p in video_files: + stem = os.path.splitext(os.path.basename(p))[0] + view_tag = "" + if multi_view: + for s in _known_suffixes: + if stem.endswith(s): + view_tag = s[1:] + stem = stem[: -len(s)] + break + caption = f"scene_{stem}_epoch_{epoch}_safe_eval" + wandb_key = f"render/safe_eval/{view_tag}" if view_tag else "render/safe_eval" + self.logger.wandb.log({wandb_key: wandb.Video(p, format="mp4", caption=caption)}) + + for p in video_files: + os.remove(p) + def log_stats(self, global_step=None): """Log collected metrics to wandb.""" if self.stats is None: diff --git a/pufferlib/ocean/drive/binding.c b/pufferlib/ocean/drive/binding.c index 75189f8dd..1db5ee0e9 100644 --- a/pufferlib/ocean/drive/binding.c +++ b/pufferlib/ocean/drive/binding.c @@ -264,6 +264,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); + free(env->tracks_to_predict_indices); free(env); Py_DECREF(agent_offsets); Py_DECREF(map_ids); @@ -285,6 +286,7 @@ static PyObject *my_shared(PyObject *self, PyObject *args, PyObject *kwargs) { free(env->active_agent_indices); free(env->static_agent_indices); free(env->expert_static_agent_indices); + free(env->tracks_to_predict_indices); free(env); continue; } @@ -369,6 +371,7 @@ static int my_init(Env *env, PyObject *args, PyObject *kwargs) { env->min_goal_distance = (float)unpack(kwargs, "min_goal_distance"); env->max_goal_distance = (float)unpack(kwargs, "max_goal_distance"); env->goal_radius = (float)unpack(kwargs, "goal_radius"); + env->render_mode = (int)unpack(kwargs, "render_mode"); env->min_goal_speed = (float)unpack(kwargs, "min_goal_speed"); env->max_goal_speed = (float)unpack(kwargs, "max_goal_speed"); env->min_avg_speed_to_consider_goal_attempt = (float)unpack(kwargs, "min_avg_speed_to_consider_goal_attempt"); diff --git a/pufferlib/ocean/drive/drive.c b/pufferlib/ocean/drive/drive.c index f3c1cd66b..57f38623e 100644 --- a/pufferlib/ocean/drive/drive.c +++ b/pufferlib/ocean/drive/drive.c @@ -20,8 +20,7 @@ void test_drivenet() { // Weights* weights = load_weights("resources/drive/puffer_drive_weights.bin"); Weights *weights = load_weights("puffer_drive_weights.bin"); - int reward_conditioning = 0; - DriveNet *net = init_drivenet(weights, num_agents, CLASSIC, reward_conditioning); + DriveNet *net = init_drivenet(weights, num_agents, CLASSIC, 1); forward(net, observations, actions); for (int i = 0; i < num_agents * num_actions; i++) { @@ -35,7 +34,7 @@ void test_drivenet() { free(weights); } -void demo() { +void demo(const char *map_name_arg, const char *policy_name_arg, int view_mode, int draw_traces) { // Read configuration from INI file env_init_config conf = {0}; const char *ini_file = "pufferlib/config/ocean/drive.ini"; @@ -83,18 +82,22 @@ void demo() { }; Drive env = { - .human_agent_idx = 0, - .action_type = 0, // Demo doesn't support continuous action space + .action_type = conf.action_type, .dynamics_model = conf.dynamics_model, .reward_vehicle_collision = conf.reward_vehicle_collision, .reward_offroad_collision = conf.reward_offroad_collision, + .reward_lane_align = conf.reward_lane_align, + .reward_lane_center = conf.reward_lane_center, .reward_goal = conf.reward_goal, .reward_goal_post_respawn = conf.reward_goal_post_respawn, .goal_radius = conf.goal_radius, + .min_goal_speed = conf.min_goal_speed, .goal_behavior = conf.goal_behavior, + .reward_randomization = conf.reward_randomization, + .reward_conditioning = conf.reward_conditioning, + .turn_off_normalization = conf.turn_off_normalization, .min_goal_distance = conf.min_goal_distance, .max_goal_distance = conf.max_goal_distance, - .min_goal_speed = conf.min_goal_speed, .max_goal_speed = conf.max_goal_speed, .dt = conf.dt, .episode_length = conf.episode_length, @@ -108,8 +111,27 @@ void demo() { .init_mode = conf.init_mode, .control_mode = conf.control_mode, .spawn_settings = spawn_settings, - .map_name = "resources/drive/binaries/carla_2D/map_001.bin", - .reward_conditioning = conf.reward_conditioning, + .reward_bounds = + { + {conf.reward_bound_goal_radius_min, conf.reward_bound_goal_radius_max}, + {conf.reward_bound_collision_min, conf.reward_bound_collision_max}, + {conf.reward_bound_offroad_min, conf.reward_bound_offroad_max}, + {conf.reward_bound_comfort_min, conf.reward_bound_comfort_max}, + {conf.reward_bound_lane_align_min, conf.reward_bound_lane_align_max}, + {conf.reward_bound_lane_center_min, conf.reward_bound_lane_center_max}, + {conf.reward_bound_velocity_min, conf.reward_bound_velocity_max}, + {conf.reward_bound_traffic_light_min, conf.reward_bound_traffic_light_max}, + {conf.reward_bound_center_bias_min, conf.reward_bound_center_bias_max}, + {conf.reward_bound_vel_align_min, conf.reward_bound_vel_align_max}, + {conf.reward_bound_overspeed_min, conf.reward_bound_overspeed_max}, + {conf.reward_bound_timestep_min, conf.reward_bound_timestep_max}, + {conf.reward_bound_reverse_min, conf.reward_bound_reverse_max}, + {conf.reward_bound_throttle_min, conf.reward_bound_throttle_max}, + {conf.reward_bound_steer_min, conf.reward_bound_steer_max}, + {conf.reward_bound_acc_min, conf.reward_bound_acc_max}, + }, + .map_name = "resources/drive/binaries/carla/carla_3D/map_001.bin", + .render_mode = RENDER_WINDOW, .partner_obs_radius = conf.partner_obs_radius, }; @@ -125,8 +147,8 @@ void demo() { return; } c_reset(&env); - c_render(&env); - Weights *weights = load_weights("resources/drive/puffer_drive_weights.bin"); + c_render(&env, view_mode, draw_traces); + Weights *weights = load_weights(policy_name_arg ? policy_name_arg : "resources/drive/puffer_drive_weights.bin"); DriveNet *net = init_drivenet(weights, env.active_agent_count, env.dynamics_model, env.reward_conditioning); int accel_delta = 1; @@ -192,13 +214,14 @@ void demo() { } c_step(&env); - c_render(&env); + c_render(&env, view_mode, draw_traces); } close_client(env.client); free_allocated(&env); free_drivenet(net); free(weights); + return; } void performance_test() { @@ -243,61 +266,19 @@ void performance_test() { } int main(int argc, char *argv[]) { - // Visualization-only parameters (not in [env] section) - int show_grid = 0; - int obs_only = 0; - int lasers = 0; - int show_human_logs = 0; - int frame_skip = 1; - int zoom_in = 0; - const char *view_mode = "both"; - - // File paths and num_maps (not in [env] section) const char *map_name = NULL; - const char *policy_name = "resources/drive/puffer_drive_weights.bin"; - const char *output_topdown = NULL; - const char *output_agent = NULL; - int num_maps = 1; + const char *policy_name = NULL; + int view_mode = VIEW_MODE_SIM_STATE; // Default: full sim-state bird's-eye view + int draw_traces = 1; // Default: show logged trajectories // Parse command line arguments for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "--show-grid") == 0) { - show_grid = 1; - } else if (strcmp(argv[i], "--obs-only") == 0) { - obs_only = 1; - } else if (strcmp(argv[i], "--lasers") == 0) { - lasers = 1; - } else if (strcmp(argv[i], "--log-trajectories") == 0) { - show_human_logs = 1; - } else if (strcmp(argv[i], "--frame-skip") == 0) { - if (i + 1 < argc) { - frame_skip = atoi(argv[i + 1]); - i++; - if (frame_skip <= 0) { - frame_skip = 1; - } - } - } else if (strcmp(argv[i], "--zoom-in") == 0) { - zoom_in = 1; - } else if (strcmp(argv[i], "--view") == 0) { - if (i + 1 < argc) { - view_mode = argv[i + 1]; - i++; - if (strcmp(view_mode, "both") != 0 && strcmp(view_mode, "topdown") != 0 && - strcmp(view_mode, "agent") != 0) { - fprintf(stderr, "Error: --view must be 'both', 'topdown', or 'agent'\n"); - return 1; - } - } else { - fprintf(stderr, "Error: --view option requires a value (both/topdown/agent)\n"); - return 1; - } - } else if (strcmp(argv[i], "--map-name") == 0) { + if (strcmp(argv[i], "--map-name") == 0) { if (i + 1 < argc) { map_name = argv[i + 1]; i++; } else { - fprintf(stderr, "Error: --map-name option requires a map file path\n"); + fprintf(stderr, "Error: --map-name requires a map file path\n"); return 1; } } else if (strcmp(argv[i], "--policy-name") == 0) { @@ -305,30 +286,34 @@ int main(int argc, char *argv[]) { policy_name = argv[i + 1]; i++; } else { - fprintf(stderr, "Error: --policy-name option requires a policy file path\n"); + fprintf(stderr, "Error: --policy-name requires a policy file path\n"); return 1; } - } else if (strcmp(argv[i], "--output-topdown") == 0) { - if (i + 1 < argc) { - output_topdown = argv[i + 1]; - i++; - } - } else if (strcmp(argv[i], "--output-agent") == 0) { - if (i + 1 < argc) { - output_agent = argv[i + 1]; - i++; - } - } else if (strcmp(argv[i], "--num-maps") == 0) { + } else if (strcmp(argv[i], "--view") == 0) { if (i + 1 < argc) { - num_maps = atoi(argv[i + 1]); + const char *v = argv[i + 1]; i++; + if (strcmp(v, "sim_state") == 0 || strcmp(v, "topdown") == 0) { + view_mode = VIEW_MODE_SIM_STATE; + } else if (strcmp(v, "bev") == 0 || strcmp(v, "agent") == 0) { + view_mode = VIEW_MODE_BEV_AGENT_OBS; + } else if (strcmp(v, "persp") == 0) { + view_mode = VIEW_MODE_AGENT_PERSP; + } else { + fprintf(stderr, "Error: --view must be 'sim_state', 'bev', 'persp', or 'zoom_out'\n"); + return 1; + } + } else { + fprintf(stderr, "Error: --view requires a value (sim_state/bev/persp/zoom_out)\n"); + return 1; } + } else if (strcmp(argv[i], "--no-traces") == 0) { + draw_traces = 0; } } // performance_test(); - demo(map_name, policy_name, show_grid, obs_only, lasers, show_human_logs, frame_skip, view_mode, output_topdown, - output_agent, num_maps, zoom_in); + demo(map_name, policy_name, view_mode, draw_traces); // test_drivenet(); return 0; } diff --git a/pufferlib/ocean/drive/drive.h b/pufferlib/ocean/drive/drive.h index 7953617eb..741a9307e 100644 --- a/pufferlib/ocean/drive/drive.h +++ b/pufferlib/ocean/drive/drive.h @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -13,6 +15,14 @@ #include "error.h" #include "datatypes.h" +#define RENDER_WINDOW 0 +#define RENDER_HEADLESS 1 + +// View modes +#define VIEW_MODE_SIM_STATE 0 +#define VIEW_MODE_BEV_AGENT_OBS 1 +#define VIEW_MODE_AGENT_PERSP 2 + // constants for strings, data etc. #define SCENARIO_ID_STR_LENGTH 16 @@ -200,6 +210,13 @@ const Color PUFF_BACKGROUND2 = (Color){18, 72, 72, 255}; const Color LIGHTGREEN = (Color){152, 255, 152, 255}; const Color LIGHTYELLOW = (Color){255, 255, 152, 255}; const Color SOFT_YELLOW = (Color){245, 245, 220, 255}; +const Color ROAD_COLOR = (Color){35, 35, 37, 255}; +const Color LIGHTBLUE = (Color){167, 204, 255, 255}; +const Color DEEPBLUE = (Color){45, 112, 226, 255}; +const Color EXPERT_REPLAY = (Color){162, 220, 183, 255}; +const Color EXPERT_REPLAY_SMALL = (Color){95, 112, 93, 255}; +const Color LIGHT_ORANGE = (Color){255, 160, 80, 255}; +const Color LIGHT_PURPLE = (Color){204, 204, 255, 255}; struct timespec ts; @@ -353,6 +370,7 @@ struct Drive { float max_goal_distance; char *ini_file; char scenario_id[SCENARIO_ID_STR_LENGTH]; + char video_suffix[64]; // Optional suffix appended to mp4 filename (e.g. "_bev") int collision_behavior; int offroad_behavior; float observation_window_size; @@ -366,6 +384,7 @@ struct Drive { int reward_randomization; int reward_conditioning; int turn_off_normalization; + int render_mode; RewardBound reward_bounds[NUM_REWARD_COEFS]; float min_avg_speed_to_consider_goal_attempt; float partner_obs_radius; @@ -1068,6 +1087,24 @@ void load_map_binary(const char *filename, Drive *env) { if (!file) return; + // Populate scenario_id from the filename stem (e.g. "/path/to/abc123def456.bin" → "abc123def456") + // This is the string used by make_client to name the output mp4. + { + const char *base = filename; + // Find last '/' or '\\' + for (const char *p = filename; *p; p++) { + if (*p == '/' || *p == '\\') + base = p + 1; + } + // Copy up to SCENARIO_ID_STR_LENGTH chars, stopping at '.' or end + int i = 0; + while (i < SCENARIO_ID_STR_LENGTH - 1 && base[i] && base[i] != '.') { + env->scenario_id[i] = base[i]; + i++; + } + env->scenario_id[i] = '\0'; + } + // Read sdc_track_index fread(&env->sdc_track_index, sizeof(int), 1, file); @@ -1561,7 +1598,7 @@ static bool check_offroad(Drive *env, Agent *agent) { RoadMapElement *entity; entity = &env->road_elements[entity_list[i].entity_idx]; - // Check for offroad collision with road edges + // Check for offroad collision with road edges (only for vehicles and cyclists) if (entity->type == ROAD_EDGE) { int geometry_idx = entity_list[i].geometry_idx; if (entity->z[geometry_idx] > agent->sim_z + agent->sim_height / 2.0f || @@ -2054,7 +2091,6 @@ int spawn_active_agents(Drive *env, int num_agents_to_create) { void spawn_agents_with_counts(Drive *env) { // Currently only creates active agents int num_agents_to_create = env->num_agents; - int successfully_created = spawn_active_agents(env, num_agents_to_create); env->num_created_agents = successfully_created; @@ -2261,7 +2297,13 @@ void init(Drive *env) { env->logs = (Log *)calloc(env->active_agent_count, sizeof(Log)); } +void close_client(Client *client); + void c_close(Drive *env) { + if (env->client != NULL) { + close_client(env->client); + env->client = NULL; + } free_agents(env->agents, env->num_objects); for (int i = 0; i < env->num_roads; i++) { free_road_element(&env->road_elements[i]); @@ -2290,6 +2332,7 @@ void c_close(Drive *env) { free(env->static_agent_indices); free(env->expert_static_agent_indices); free(env->tracks_to_predict_indices); + env->tracks_to_predict_indices = NULL; free(env->ini_file); } @@ -2587,10 +2630,11 @@ void compute_agent_metrics(Drive *env, int agent_idx) { agent->metrics_array[SPEED_LIMIT_IDX] = (speed_magnitude > SPEED_LIMIT + 2.0f) ? 1.0f : 0.0f; // Check for vehicle collisions - int car_collided_with_index = collision_check(env, agent_idx); - if (car_collided_with_index != -1) + int car_collided_with_index = -1; + car_collided_with_index = collision_check(env, agent_idx); + if (car_collided_with_index != -1) { collided = VEHICLE_COLLISION; - + } agent->collision_state = collided; if (collided == VEHICLE_COLLISION) { @@ -3483,16 +3527,95 @@ struct Client { int car_assignments[MAX_AGENTS]; // To keep car model assignments consistent per vehicle Vector3 default_camera_position; Vector3 default_camera_target; + int recorder_pipefd[2]; + pid_t recorder_pid; + pid_t xvfb_pid; + int xvfb_display_num; }; Client *make_client(Drive *env) { Client *client = (Client *)calloc(1, sizeof(Client)); - client->width = 1280; - client->height = 704; - SetConfigFlags(FLAG_MSAA_4X_HINT); + + if (env->render_mode == RENDER_HEADLESS && getenv("DISPLAY") == NULL) { + + // Use a per-process display number so multiple rendering jobs on the same + // node don't collide. Range :100-:999 avoids the system default :0. + client->xvfb_display_num = 100 + (getpid() % 900); + + char lock_file[32], socket_file[32], display_str[16]; + snprintf(display_str, sizeof(display_str), ":%d", client->xvfb_display_num); + snprintf(lock_file, sizeof(lock_file), "/tmp/.X%d-lock", client->xvfb_display_num); + snprintf(socket_file, sizeof(socket_file), "/tmp/.X11-unix/X%d", client->xvfb_display_num); + + // Clean up a stale lock only if the owning process is already dead + FILE *f = fopen(lock_file, "r"); + if (f) { + pid_t pid = -1; + fscanf(f, "%d", &pid); + fclose(f); + if (pid > 0 && kill(pid, 0) != 0) { + unlink(lock_file); + unlink(socket_file); + } + } + + client->xvfb_pid = fork(); + if (client->xvfb_pid == 0) { + close(STDOUT_FILENO); + close(STDERR_FILENO); + execlp("Xvfb", "Xvfb", display_str, "-screen", "0", "1280x720x24", "+extension", "GLX", "-ac", "-noreset", + NULL); + _exit(1); + } + + setenv("DISPLAY", display_str, 1); + // Xvfb starts asynchronously after fork(), so we poll until it creates its + // lock file (max 2s) then wait an extra 200ms for GLX to finish initializing. + // Without this, raylib's InitWindow() would try to connect before Xvfb is ready. + for (int i = 0; i < 20 && access(lock_file, F_OK) != 0; i++) + usleep(100000); + usleep(200000); + } + + if (env->render_mode == RENDER_WINDOW) { + client->width = 1280; + client->height = 704; + SetConfigFlags(FLAG_MSAA_4X_HINT); + SetTargetFPS(30); + + // Set up camera for interactive window + Vector3 target_pos = {0, 0, 1}; // Y is up, Z is depth + + client->default_camera_position = (Vector3){ + 0, // Same X as target + 120.0f, // 20 units above target + 175.0f // 20 units behind target + }; + client->default_camera_target = target_pos; + client->camera.position = client->default_camera_position; + client->camera.target = client->default_camera_target; + client->camera.up = (Vector3){0.0f, -1.0f, 0.0f}; // Y is up + client->camera.fovy = 45.0f; + client->camera.projection = CAMERA_PERSPECTIVE; + + } else { // Headless rendering + SetConfigFlags(FLAG_WINDOW_HIDDEN); + SetTargetFPS(6000); + + float map_width = env->grid_map->bottom_right_x - env->grid_map->top_left_x; + float map_height = env->grid_map->top_left_y - env->grid_map->bottom_right_y; + float scale = 6.0f; // Controls the resolution of the output video + int img_width = (int)roundf(map_width * scale / 2.0f) * 2; + int img_height = (int)roundf(map_height * scale / 2.0f) * 2; + + client->width = img_width; + client->height = img_height; + } + + SetTraceLogLevel(LOG_WARNING); // Only show warnings and errors InitWindow(client->width, client->height, "PufferDrive"); - SetTargetFPS(30); - client->puffers = LoadTexture("resources/puffers_128.png"); + + // Load assets client->cars[0] = LoadModel("resources/drive/RedCar.glb"); client->cars[1] = LoadModel("resources/drive/WhiteCar.glb"); client->cars[2] = LoadModel("resources/drive/BlueCar.glb"); @@ -3506,26 +3629,46 @@ Client *make_client(Drive *env) { for (int i = 0; i < MAX_AGENTS; i++) { client->car_assignments[i] = (rand() % 4) + 1; } - // Get initial target position from first active agent - Vector3 target_pos = { - 0, - 0, // Y is up - 1 // Z is depth - }; - // Set up camera to look at target from above and behind - client->default_camera_position = (Vector3){ - 0, // Same X as target - 120.0f, // 20 units above target - 40.0f // 20 units behind target - }; - client->default_camera_target = target_pos; - client->camera.position = client->default_camera_position; - client->camera.target = client->default_camera_target; - client->camera.up = (Vector3){0.0f, -1.0f, 0.0f}; // Y is up - client->camera.fovy = 45.0f; - client->camera.projection = CAMERA_PERSPECTIVE; - client->camera_zoom = 1.0f; + // Set up ffmpeg process for recording + if (env->render_mode == RENDER_HEADLESS) { + if (pipe(client->recorder_pipefd) == -1) { + fprintf(stderr, "Failed to create pipe\n"); + free(client); + return NULL; + } + + char size_str[64]; + snprintf(size_str, sizeof(size_str), "%dx%d", (int)client->width, (int)client->height); + + char filename[320]; + if (env->video_suffix[0] != '\0') + snprintf(filename, sizeof(filename), "%s%s.mp4", env->scenario_id, env->video_suffix); + else + snprintf(filename, sizeof(filename), "%s.mp4", env->scenario_id); + + client->recorder_pid = fork(); + if (client->recorder_pid == -1) { + fprintf(stderr, "Failed to fork\n"); + free(client); + return NULL; + } + + if (client->recorder_pid == 0) { // Child process + close(client->recorder_pipefd[1]); + dup2(client->recorder_pipefd[0], STDIN_FILENO); + close(client->recorder_pipefd[0]); + for (int fd = 3; fd < 256; fd++) + close(fd); + execlp("ffmpeg", "ffmpeg", "-y", "-f", "rawvideo", "-pix_fmt", "rgba", "-s", size_str, "-r", "30", "-i", + "-", "-c:v", "libx264", "-pix_fmt", "yuv420p", "-preset", "ultrafast", "-crf", "23", "-loglevel", + "error", filename, NULL); + fprintf(stderr, "execlp ffmpeg failed\n"); + _exit(1); + } + close(client->recorder_pipefd[0]); + } + return client; } @@ -3632,20 +3775,27 @@ void draw_agent_obs(Drive *env, int agent_index, int mode, int obs_only, int las float goal_y = agent_obs[1] * 200; float goal_z = agent_obs[2] * 200; + int agent_type = env->agents[active_idx].type; + Color goal_color = LIGHTBLUE; + if (agent_type == PEDESTRIAN) + goal_color = LIGHT_ORANGE; + else if (agent_type == CYCLIST) + goal_color = LIGHT_PURPLE; + if (mode == 0) { - DrawSphere((Vector3){goal_x, goal_y, goal_z}, 0.5f, LIGHTGREEN); + DrawSphere((Vector3){goal_x, goal_y, goal_z}, 0.5f, goal_color); DrawCircle3D((Vector3){goal_x, goal_y, goal_z}, env->agents[active_idx].reward_coefs[REWARD_COEF_GOAL_RADIUS], - (Vector3){0, 0, 1}, 90.0f, Fade(LIGHTGREEN, 0.3f)); + (Vector3){0, 0, 1}, 90.0f, Fade(goal_color, 0.3f)); } if (mode == 1) { float goal_x_world = px + (goal_x * heading_self_x - goal_y * heading_self_y); float goal_y_world = py + (goal_x * heading_self_y + goal_y * heading_self_x); float goal_z_world = pz + goal_z; - DrawSphere((Vector3){goal_x_world, goal_y_world, goal_z_world}, 0.5f, LIGHTGREEN); + DrawSphere((Vector3){goal_x_world, goal_y_world, goal_z_world}, 0.5f, goal_color); DrawCircle3D((Vector3){goal_x_world, goal_y_world, goal_z_world}, env->agents[active_idx].reward_coefs[REWARD_COEF_GOAL_RADIUS], (Vector3){0, 0, 1}, 90.0f, - Fade(LIGHTGREEN, 0.3f)); + Fade(goal_color, 0.3f)); } // First draw other agent observations int obs_idx = ego_dim; // Start after ego obs @@ -3955,27 +4105,39 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, continue; } - // --- Draw the car --- - Color car_color = GRAY; // default for static - if (is_expert) - car_color = GOLD; // expert replay - if (is_active_agent) - car_color = BLUE; // policy-controlled - if (is_active_agent && agent->aabb_collision_state > 0) - car_color = LIGHTGREEN; - if (is_active_agent && agent->collision_state > 0) - car_color = RED; - rlSetLineWidth(3.0f); - for (int j = 0; j < 4; j++) { - DrawLine3D(corners[j], corners[(j + 1) % 4], car_color); + // Draw the agent bounding boxes + Color agent_color = GRAY; + if (is_expert) { + if (agent->type == PEDESTRIAN || agent->type == CYCLIST) + agent_color = EXPERT_REPLAY_SMALL; + else + agent_color = EXPERT_REPLAY; } + if (is_active_agent) { + if (agent->type == PEDESTRIAN) + agent_color = LIGHT_ORANGE; + else if (agent->type == CYCLIST) + agent_color = LIGHT_PURPLE; + else + agent_color = BLUE; + } + if (is_active_agent && agent->collision_state > 0) + agent_color = RED; + + rlPushMatrix(); + rlTranslatef(position.x, position.y, position.z); + rlRotatef(heading * RAD2DEG, 0.0f, 0.0f, 1.0f); + DrawCube((Vector3){0.0f, 0.0f, 0.0f}, size.x, size.y, 1.0f, Fade(agent_color, 0.5f)); + DrawCubeWires((Vector3){0.0f, 0.0f, 0.0f}, size.x, size.y, 1.0f, agent_color); + rlPopMatrix(); + // --- Draw a heading arrow pointing forward --- Vector3 arrowStart = position; Vector3 arrowEnd = {position.x + cos_heading * half_len * 1.5f, // extend arrow beyond car position.y + sin_heading * half_len * 1.5f, position.z}; - DrawLine3D(arrowStart, arrowEnd, car_color); - DrawSphere(arrowEnd, 0.2f, car_color); // arrow tip + DrawLine3D(arrowStart, arrowEnd, agent_color); + DrawSphere(arrowEnd, 0.2f, agent_color); // arrow tip } else { // Agent view rlPushMatrix(); @@ -4041,7 +4203,7 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, }; Color wire_color = GRAY; // static if (!is_active_agent && agent->mark_as_expert == 1) - wire_color = GOLD; // expert replay + wire_color = EXPERT_REPLAY; // expert replay if (is_active_agent) wire_color = BLUE; // policy if (is_active_agent && agent->aabb_collision_state > 0) @@ -4077,13 +4239,20 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, continue; } if (!IsKeyDown(KEY_LEFT_CONTROL) && obs_only == 0) { + + Color goal_color = DEEPBLUE; + if (agent->type == PEDESTRIAN) + goal_color = LIGHT_ORANGE; + else if (agent->type == CYCLIST) + goal_color = LIGHT_PURPLE; + DrawSphere( (Vector3){ agent->goal_position_x, agent->goal_position_y, agent->goal_position_z, }, - 0.5f, DARKGREEN); + 0.5f, goal_color); DrawCircle3D( (Vector3){ @@ -4091,7 +4260,7 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, agent->goal_position_y, agent->goal_position_z, }, - agent->reward_coefs[REWARD_COEF_GOAL_RADIUS], (Vector3){0, 0, 1}, 90.0f, Fade(LIGHTGREEN, 0.3f)); + agent->reward_coefs[REWARD_COEF_GOAL_RADIUS], (Vector3){0, 0, 1}, 90.0f, Fade(goal_color, 0.3f)); } } } @@ -4108,7 +4277,7 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, else if (road->type == ROAD_LINE) lineColor = WHITE; else if (road->type == ROAD_EDGE) - lineColor = WHITE; + lineColor = Fade(WHITE, 0.7f); else if (road->type == DRIVEWAY) lineColor = RED; @@ -4155,97 +4324,195 @@ void draw_scene(Drive *env, Client *client, int mode, int obs_only, int lasers, } } -void c_render(Drive *env) { +void c_render(Drive *env, int view_mode, int draw_traces) { + + // Create client on first render call if (env->client == NULL) { env->client = make_client(env); } Client *client = env->client; - BeginDrawing(); - Color road = (Color){35, 35, 37, 255}; - ClearBackground(road); - BeginMode3D(client->camera); - handle_camera_controls(env->client); - draw_scene(env, client, 0, 0, 0, 0); - if (IsKeyPressed(KEY_TAB) && env->active_agent_count > 0) { - env->human_agent_idx = (env->human_agent_idx + 1) % env->active_agent_count; - } + if (env->render_mode == RENDER_HEADLESS) { // Headless rendering via ffmpeg + float map_width = env->grid_map->bottom_right_x - env->grid_map->top_left_x; + float map_height = env->grid_map->top_left_y - env->grid_map->bottom_right_y; - // Draw debug info - DrawText(TextFormat("Camera Position: (%.2f, %.2f, %.2f)", client->camera.position.x, client->camera.position.y, - client->camera.position.z), - 10, 10, 20, PUFF_WHITE); - DrawText(TextFormat("Camera Target: (%.2f, %.2f, %.2f)", client->camera.target.x, client->camera.target.y, - client->camera.target.z), - 10, 30, 20, PUFF_WHITE); - DrawText(TextFormat("Timestep: %d", env->timestep), 10, 50, 20, PUFF_WHITE); + Camera3D camera = {0}; + + if (view_mode == VIEW_MODE_SIM_STATE) { + // Orthographic bird's-eye view over the entire map (fully observable) + camera.position = (Vector3){0.0, 0.0, 400.0f}; // Above the scene + camera.target = (Vector3){0.0, 0.0, 0.0}; // Look at origin + camera.up = (Vector3){0.0f, -1.0f, 0.0f}; + camera.projection = CAMERA_ORTHOGRAPHIC; + camera.fovy = map_height; + + BeginDrawing(); + ClearBackground(ROAD_COLOR); + BeginMode3D(camera); + + if (draw_traces) { // Show logged trajectories of active agents and expert static agents + for (int i = 0; i < env->active_agent_count; i++) { + int idx = env->active_agent_indices[i]; + for (int t = env->init_steps; t < env->episode_length; t++) { + Color agent_color = LIGHTBLUE; + if (env->agents[idx].type == PEDESTRIAN) { + agent_color = LIGHT_ORANGE; + } else if (env->agents[idx].type == CYCLIST) { + agent_color = LIGHT_PURPLE; + } + DrawSphere((Vector3){env->agents[idx].log_trajectory_x[t], env->agents[idx].log_trajectory_y[t], + env->agents[idx].log_trajectory_z[t]}, + 0.15f, agent_color); + } + } - int human_idx = env->active_agent_indices[env->human_agent_idx]; - DrawText(TextFormat("Controlling Agent: %d", env->human_agent_idx), 10, 70, 20, PUFF_WHITE); - DrawText(TextFormat("Agent Index: %d", human_idx), 10, 90, 20, PUFF_WHITE); + for (int i = 0; i < env->expert_static_agent_count; i++) { + int idx = env->expert_static_agent_indices[i]; + for (int t = env->init_steps; t < env->episode_length; t++) { + DrawSphere((Vector3){env->agents[idx].log_trajectory_x[t], env->agents[idx].log_trajectory_y[t], + env->agents[idx].log_trajectory_z[t]}, + 0.15f, EXPERT_REPLAY); + } + } + } - // Display current action values - yellow when controlling, white otherwise - Color action_color = IsKeyDown(KEY_LEFT_SHIFT) ? YELLOW : PUFF_WHITE; + draw_scene(env, client, 1, 0, 0, 0); - if (env->action_type == 0) { // discrete - int *action_array = (int *)env->actions; - int action_val = action_array[env->human_agent_idx]; + } else if (view_mode == VIEW_MODE_BEV_AGENT_OBS) { + // Orthographic bird's-eye view centered on the selected agent, + // showing only that agent's observations + int agent_idx = env->active_agent_indices[env->human_agent_idx]; + Agent *agent = &env->agents[agent_idx]; - if (env->dynamics_model == CLASSIC) { - int num_steer = 13; - int accel_idx = action_val / num_steer; - int steer_idx = action_val % num_steer; - float accel_value = ACCELERATION_VALUES[accel_idx]; - float steer_value = STEERING_VALUES[steer_idx]; - - DrawText(TextFormat("Acceleration: %.2f m/s^2", accel_value), 10, 110, 20, action_color); - DrawText(TextFormat("Steering: %.3f", steer_value), 10, 130, 20, action_color); - } else if (env->dynamics_model == JERK) { - int num_lat = 3; - int jerk_long_idx = action_val / num_lat; - int jerk_lat_idx = action_val % num_lat; - float jerk_long_value = JERK_LONG[jerk_long_idx]; - float jerk_lat_value = JERK_LAT[jerk_lat_idx]; - - DrawText(TextFormat("Longitudinal Jerk: %.2f m/s^3", jerk_long_value), 10, 110, 20, action_color); - DrawText(TextFormat("Lateral Jerk: %.2f m/s^3", jerk_lat_value), 10, 130, 20, action_color); + Camera3D camera = {0}; + camera.position = (Vector3){agent->sim_x, agent->sim_y, 400.0f}; + camera.target = (Vector3){agent->sim_x, agent->sim_y, 0.0f}; + camera.up = (Vector3){0.0f, -1.0f, 0.0f}; + camera.projection = CAMERA_ORTHOGRAPHIC; + camera.fovy = env->grid_map->vision_range * GRID_CELL_SIZE * 2.0f; + + BeginDrawing(); + ClearBackground(ROAD_COLOR); + BeginMode3D(camera); + draw_scene(env, client, 1, 1, 0, 0); + + } else { // First-person perspective from a selected agent + int agent_idx = env->active_agent_indices[env->human_agent_idx]; + Agent *agent = &env->agents[agent_idx]; + + Camera3D camera = {0}; + // Position camera behind and above the agent + camera.position = (Vector3){agent->sim_x - (25.0f * cosf(agent->sim_heading)), + agent->sim_y - (25.0f * sinf(agent->sim_heading)), 15.0f}; + camera.target = (Vector3){agent->sim_x + 40.0f * cosf(agent->sim_heading), + agent->sim_y + 40.0f * sinf(agent->sim_heading), 1.0f}; + camera.up = (Vector3){0.0f, 0.0f, 1.0f}; + camera.fovy = 60.0f; + camera.projection = CAMERA_PERSPECTIVE; + + BeginDrawing(); + ClearBackground(ROAD_COLOR); + BeginMode3D(camera); + draw_scene(env, client, 0, 0, 0, 1); } - } else { // continuous - float (*action_array_f)[2] = (float (*)[2])env->actions; - DrawText(TextFormat("Acceleration: %.2f", action_array_f[env->human_agent_idx][0]), 10, 110, 20, action_color); - DrawText(TextFormat("Steering: %.2f", action_array_f[env->human_agent_idx][1]), 10, 130, 20, action_color); - } - // Show key press status - int status_y = 150; - if (IsKeyDown(KEY_LEFT_SHIFT)) { - DrawText("[shift pressed]", 10, status_y, 20, YELLOW); - status_y += 20; - } - if (IsKeyDown(KEY_SPACE)) { - DrawText("[space pressed]", 10, status_y, 20, YELLOW); - status_y += 20; - } - if (IsKeyDown(KEY_LEFT_CONTROL)) { - DrawText("[ctrl pressed]", 10, status_y, 20, YELLOW); - status_y += 20; - } + EndDrawing(); + + unsigned char *screen_data = rlReadScreenPixels((int)client->width, (int)client->height); + if (screen_data) { + write(client->recorder_pipefd[1], screen_data, (int)client->width * (int)client->height * 4); + RL_FREE(screen_data); + } + } else { // Pop-up window + BeginDrawing(); + ClearBackground(ROAD_COLOR); + BeginMode3D(client->camera); + handle_camera_controls(env->client); + draw_scene(env, client, 0, 0, 0, 0); + + if (IsKeyPressed(KEY_TAB) && env->active_agent_count > 0) { + env->human_agent_idx = (env->human_agent_idx + 1) % env->active_agent_count; + } + + DrawText(TextFormat("Timestep: %d", env->timestep), 10, 50, 20, PUFF_WHITE); + DrawText(TextFormat("Controlling agent: %d", env->human_agent_idx), 10, 70, 20, PUFF_WHITE); + int human_idx = env->active_agent_indices[env->human_agent_idx]; + + Color action_color = IsKeyDown(KEY_LEFT_SHIFT) ? YELLOW : PUFF_WHITE; + + if (env->action_type == 0) { // discrete + int *action_array = (int *)env->actions; + int action_val = action_array[env->human_agent_idx]; + + if (env->dynamics_model == CLASSIC) { + int num_steer = 13; + int accel_idx = action_val / num_steer; + int steer_idx = action_val % num_steer; + float accel_value = ACCELERATION_VALUES[accel_idx]; + float steer_value = STEERING_VALUES[steer_idx]; + + DrawText(TextFormat("Acceleration: %.2f m/s^2", accel_value), 10, 110, 20, action_color); + DrawText(TextFormat("Steering: %.3f", steer_value), 10, 130, 20, action_color); + } else if (env->dynamics_model == JERK) { + int num_lat = 3; + int jerk_long_idx = action_val / num_lat; + int jerk_lat_idx = action_val % num_lat; + float jerk_long_value = JERK_LONG[jerk_long_idx]; + float jerk_lat_value = JERK_LAT[jerk_lat_idx]; + + DrawText(TextFormat("Longitudinal Jerk: %.2f m/s^3", jerk_long_value), 10, 110, 20, action_color); + DrawText(TextFormat("Lateral Jerk: %.2f m/s^3", jerk_lat_value), 10, 130, 20, action_color); + } + } else { // continuous + float (*action_array_f)[2] = (float (*)[2])env->actions; + DrawText(TextFormat("Acceleration: %.2f", action_array_f[env->human_agent_idx][0]), 10, 110, 20, + action_color); + DrawText(TextFormat("Steering: %.2f", action_array_f[env->human_agent_idx][1]), 10, 130, 20, action_color); + } - // Controls help - DrawText("Controls: SHIFT + W/S - Accelerate/Brake, SHIFT + A/D - Steer, TAB - Switch Agent", 10, - client->height - 30, 20, PUFF_WHITE); + int status_y = 150; + if (IsKeyDown(KEY_LEFT_SHIFT)) { + DrawText("[shift pressed]", 10, status_y, 20, YELLOW); + status_y += 20; + } + if (IsKeyDown(KEY_SPACE)) { + DrawText("[space pressed]", 10, status_y, 20, YELLOW); + status_y += 20; + } + if (IsKeyDown(KEY_LEFT_CONTROL)) { + DrawText("[ctrl pressed]", 10, status_y, 20, YELLOW); + status_y += 20; + } - DrawText(TextFormat("Grid Rows: %d", env->grid_map->grid_rows), 10, status_y, 20, PUFF_WHITE); - DrawText(TextFormat("Grid Cols: %d", env->grid_map->grid_cols), 10, status_y + 20, 20, PUFF_WHITE); - EndDrawing(); + DrawText("Controls: SHIFT + W/S - Accelerate/Brake, SHIFT + A/D - Steer, TAB - Switch Agent", 10, + client->height - 30, 20, PUFF_WHITE); + DrawText(TextFormat("Grid Rows: %d", env->grid_map->grid_rows), 10, status_y, 20, PUFF_WHITE); + DrawText(TextFormat("Grid Cols: %d", env->grid_map->grid_cols), 10, status_y + 20, 20, PUFF_WHITE); + EndDrawing(); + } } void close_client(Client *client) { - for (int i = 0; i < 6; i++) { - UnloadModel(client->cars[i]); + if (client->recorder_pid > 0) { + close(client->recorder_pipefd[1]); + waitpid(client->recorder_pid, NULL, 0); } - UnloadTexture(client->puffers); + for (int i = 0; i < 6; i++) + UnloadModel(client->cars[i]); + UnloadModel(client->cyclist); + UnloadModel(client->pedestrian); CloseWindow(); + if (client->xvfb_pid > 0) { + kill(client->xvfb_pid, SIGTERM); + waitpid(client->xvfb_pid, NULL, 0); + char lock_file[32], socket_file[32]; + snprintf(lock_file, sizeof(lock_file), "/tmp/.X%d-lock", client->xvfb_display_num); + snprintf(socket_file, sizeof(socket_file), "/tmp/.X11-unix/X%d", client->xvfb_display_num); + unlink(lock_file); + unlink(socket_file); + unsetenv("DISPLAY"); + } + free(client); } diff --git a/pufferlib/ocean/drive/drive.py b/pufferlib/ocean/drive/drive.py index 7e9324692..46726a942 100644 --- a/pufferlib/ocean/drive/drive.py +++ b/pufferlib/ocean/drive/drive.py @@ -4,15 +4,22 @@ import struct import os import pufferlib +from enum import IntEnum from pufferlib.ocean.drive import binding from multiprocessing import Pool, cpu_count from tqdm import tqdm +class RenderView(IntEnum): + FULL_SIM_STATE = 0 # Orthographic top-down, fully observable simulator state (zoomed in, origin-centered) + BEV_AGENT_OBS = 1 # Orthographic top-down, only show what the selected agent can observe + AGENT_PERSP = 2 # Third-person perspective following selected agent + + class Drive(pufferlib.PufferEnv): def __init__( self, - render_mode=None, + render_mode=RenderView.FULL_SIM_STATE, report_interval=1, width=1280, height=1024, @@ -345,7 +352,7 @@ def __init__( self.map_ids = map_ids self.num_envs = num_envs super().__init__(buf=buf) - env_ids = [] + self.env_ids = [] for i in range(num_envs): cur = agent_offsets[i] nxt = agent_offsets[i + 1] @@ -430,10 +437,11 @@ def __init__( spawn_length_min=self.spawn_length_min, spawn_length_max=self.spawn_length_max, spawn_height=self.spawn_height, + render_mode=render_mode, ) - env_ids.append(env_id) + self.env_ids.append(env_id) - self.c_envs = binding.vectorize(*env_ids) + self.c_envs = binding.vectorize(*self.env_ids) def reset(self, seed=0): binding.vec_reset(self.c_envs, seed) @@ -508,7 +516,7 @@ def resample_maps(self): self.agent_offsets = agent_offsets self.map_ids = map_ids self.num_envs = num_envs - env_ids = [] + self.env_ids = [] seed = np.random.randint(0, 2**32 - 1) for i in range(num_envs): cur = agent_offsets[i] @@ -593,23 +601,29 @@ def resample_maps(self): spawn_length_min=self.spawn_length_min, spawn_length_max=self.spawn_length_max, spawn_height=self.spawn_height, + render_mode=self.render_mode, ) - env_ids.append(env_id) - self.c_envs = binding.vectorize(*env_ids) + self.env_ids.append(env_id) + self.c_envs = binding.vectorize(*self.env_ids) binding.vec_reset(self.c_envs, seed) self.truncations[:] = 1 - def step(self, actions): + def step(self, actions, per_env_logs=False): self.terminals[:] = 0 self.actions[:] = actions binding.vec_step(self.c_envs) self.tick += 1 info = [] if self.tick % self.report_interval == 0: - log = binding.vec_log(self.c_envs, self.num_agents) - if log: - info.append(log) + if per_env_logs: # Get the stats for every separate env + logs = self.get_env_logs() + if any(logs): + info = logs + else: # Default: Aggregate across vectorized envs + log = binding.vec_log(self.c_envs, self.num_agents) + if log: + info.append(log) if self.tick > 0 and self.resample_frequency > 0 and self.tick % self.resample_frequency == 0: self.resample_maps() @@ -736,12 +750,34 @@ def get_road_edge_polylines(self): return polylines - def render(self): - binding.vec_render(self.c_envs, 0) + def render(self, view_mode: RenderView = RenderView.FULL_SIM_STATE, draw_traces: bool = True, env_id: int = 0): + binding.vec_render(self.c_envs, int(view_mode), draw_traces, env_id) + + def set_video_suffix(self, suffix: str, env_id: int = 0): + """Set the suffix appended to the mp4 filename for headless rendering. + + Must be called before the first render() call of a rollout. + E.g. set_video_suffix("_bev", env_id=0) → {scenario_id}_bev.mp4 + """ + binding.vec_set_video_suffix(self.c_envs, env_id, suffix) def close(self): binding.vec_close(self.c_envs) + def env_log(self, env_idx): + """Get log statistics for a single environment.""" + num_agents = self.agent_offsets[env_idx + 1] - self.agent_offsets[env_idx] + return binding.env_log(self.env_ids[env_idx], num_agents) + + def get_env_logs(self): + """Get log statistics for all environments (unaggregated).""" + return [self.env_log(i) for i in range(self.num_envs)] + + @property + def scenario_ids(self) -> list[str]: + """Return scenario ID string for each env, stripping null padding.""" + return [s.rstrip("\x00") for s in binding.vec_get_scenario_ids(self.c_envs)] + def calculate_area(p1, p2, p3): # 3D triangle area = 0.5 * ||(p2-p1) x (p3-p1)|| diff --git a/pufferlib/ocean/drive/rollout.py b/pufferlib/ocean/drive/rollout.py new file mode 100644 index 000000000..5c5c1a29c --- /dev/null +++ b/pufferlib/ocean/drive/rollout.py @@ -0,0 +1,142 @@ +"""Shared rollout loop for Drive evaluation and rendering. + +Single source of truth for the forward-sample-step-break cycle. Used by: + - ``Evaluator._run_rollout`` — periodic training eval, optional env-selective render + - ``pufferl.render`` — offline batch rendering, one video per map + - ``SafeEvaluator.render`` — safe-eval-time rendering + +Extracting this eliminates three hand-maintained copies of the same loop that +had drifted in subtle ways: + * ``render_one_map`` was missing continuous-action clipping + * ``render_one_map`` used ``done.all() or truncated.all()`` while the others used ``truncs.all()`` + * ``render_one_map`` inferred video filenames via a ``glob`` diff instead of calling ``set_video_suffix`` + * ``SafeEvaluator`` was using ``hasattr(policy, "hidden_size")`` as a proxy + for "is an RNN policy", which is fragile (many non-RNN policies also + expose ``hidden_size``). Callers now pass the authoritative ``use_rnn`` + flag from the training config. + +Callers pass a ``RenderContext`` to turn on rendering; pass ``None`` for a pure +stats rollout. +""" + +from dataclasses import dataclass +from typing import Optional + +import numpy as np +import torch + +import pufferlib.pytorch + + +@dataclass +class RenderContext: + """Enables rendering inside :func:`rollout_loop`. + + Attributes: + view_mode: ``RenderView`` enum value passed to ``driver.render``. + env_id: which sub-env in the vecenv to record from (default 0). + draw_traces: whether the C renderer should overlay trajectory traces. + video_suffix: appended to the mp4 filename; applied once before the + first render via ``set_video_suffix`` so multi-view rollouts don't + collide on output paths. + """ + + view_mode: int + env_id: int = 0 + draw_traces: bool = True + video_suffix: str = "" + + +def rollout_loop( + policy, + env, + device, + use_rnn: bool, + max_steps: Optional[int] = None, + render_ctx: Optional[RenderContext] = None, + per_env_logs: bool = False, +): + """Run a single policy rollout in a Drive vecenv. + + Args: + policy: the policy to run. Caller is responsible for calling ``.eval()``. + env: a ``PufferEnv``-compatible vecenv wrapping one or more Drive sub-envs. + device: torch device for observation / state tensors. + use_rnn: whether to allocate and carry LSTM hidden state. + max_steps: loop iteration cap. Defaults to ``env.driver_env.episode_length``. + render_ctx: if set, render the specified env/view every step before + sampling actions. Filename suffix is applied via ``set_video_suffix``. + per_env_logs: passed through to ``env.step`` so callers can get + unaggregated per-env logs instead of the default ``vec_log`` aggregate. + + Returns: + The last ``info`` returned by ``env.step``. For stats rollouts this is + the aggregated ``vec_log`` dict or per-env logs list; for render + rollouts callers typically ignore it. + """ + driver = env.driver_env + num_agents = env.observation_space.shape[0] + + # Set (or clear) the video filename suffix before the first render call so + # the C binding names the mp4 correctly up front (e.g. {scenario_id}_bev.mp4). + # Without this, callers have to glob-diff cwd and rename post hoc. We call + # this unconditionally whenever render_ctx is provided — including the + # empty-string case — so reused envs can't leak a stale suffix from a + # prior rollout into the default-filename path. + if render_ctx is not None: + driver.set_video_suffix(render_ctx.video_suffix, env_id=render_ctx.env_id) + + obs, _ = env.reset() + + state = {} + if use_rnn: + state = dict( + lstm_h=torch.zeros(num_agents, policy.hidden_size, device=device), + lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), + ) + + if max_steps is None: + max_steps = driver.episode_length + + info = [] + for _ in range(max_steps): + # Render BEFORE the step so each frame shows the state the policy was + # conditioned on. + if render_ctx is not None: + driver.render( + view_mode=render_ctx.view_mode, + draw_traces=render_ctx.draw_traces, + env_id=render_ctx.env_id, + ) + + with torch.no_grad(): + ob_t = torch.as_tensor(obs).to(device) + logits, _ = policy.forward_eval(ob_t, state) + action, _, _ = pufferlib.pytorch.sample_logits(logits) + action_np = action.cpu().numpy().reshape(env.action_space.shape) + + # Clip continuous actions to the valid range. This was previously + # missing in render_one_map, so continuous policies could emit OOB + # actions during offline rendering. + if isinstance(logits, torch.distributions.Normal): + action_np = np.clip(action_np, env.action_space.low, env.action_space.high) + + # Only pass per_env_logs when the caller explicitly asked for it. + # pufferlib.vector.Serial.step == pufferlib.vector.step, whose + # signature is (vecenv, actions) with no kwargs — passing per_env_logs + # unconditionally would raise TypeError on render_one_map's Serial + # backend. The per_env_logs=True path is only exercised by the + # Evaluator with a native PufferEnv backend where Drive.step does + # accept the kwarg. + if per_env_logs: + obs, _, _, truncs, info = env.step(action_np, per_env_logs=True) + else: + obs, _, _, truncs, info = env.step(action_np) + + # truncs.all() is set in the single c_step where the env auto-resets + # (time limit or early termination). Breaking here exits the loop + # exactly when the current episode wraps. + if truncs.all(): + break + + return info diff --git a/pufferlib/ocean/env_binding.h b/pufferlib/ocean/env_binding.h index 44eeacb40..d4090ea46 100644 --- a/pufferlib/ocean/env_binding.h +++ b/pufferlib/ocean/env_binding.h @@ -197,7 +197,7 @@ static PyObject *env_reset(PyObject *self, PyObject *args) { static PyObject *env_step(PyObject *self, PyObject *args) { int num_args = PyTuple_Size(args); if (num_args != 1) { - PyErr_SetString(PyExc_TypeError, "vec_render requires 1 argument"); + PyErr_SetString(PyExc_TypeError, "env_step requires 1 argument"); return NULL; } @@ -209,13 +209,34 @@ static PyObject *env_step(PyObject *self, PyObject *args) { Py_RETURN_NONE; } -// Python function to step the environment +// Python function to render the environment static PyObject *env_render(PyObject *self, PyObject *args) { + int num_args = PyTuple_Size(args); + if (num_args != 3) { + PyErr_SetString(PyExc_TypeError, "env_render requires 3 arguments (env_handle, view_mode, draw_traces)"); + return NULL; + } Env *env = unpack_env(args); if (!env) { return NULL; } - c_render(env); + + PyObject *view_mode_arg = PyTuple_GetItem(args, 1); + if (!PyObject_TypeCheck(view_mode_arg, &PyLong_Type)) { + PyErr_SetString(PyExc_TypeError, "view_mode must be an integer"); + return NULL; + } + int view_mode = PyLong_AsLong(view_mode_arg); + + PyObject *show_traces_arg = PyTuple_GetItem(args, 2); + if (!PyObject_TypeCheck(show_traces_arg, &PyBool_Type)) { + PyErr_SetString(PyExc_TypeError, "draw_traces must be a boolean"); + return NULL; + } + bool draw_traces = PyObject_IsTrue(show_traces_arg); + + c_render(env, view_mode, draw_traces); + Py_RETURN_NONE; } @@ -529,10 +550,40 @@ static PyObject *vec_step(PyObject *self, PyObject *arg) { Py_RETURN_NONE; } +static PyObject *vec_set_video_suffix(PyObject *self, PyObject *args) { + // Set a suffix appended to the mp4 filename for headless rendering. + // Call this before the first vec_render of each rollout when using multi-view. + // Args: (vec_env_ptr, env_id, suffix_str) + int num_args = PyTuple_Size(args); + if (num_args != 3) { + PyErr_SetString(PyExc_TypeError, "vec_set_video_suffix requires 3 arguments: (vec, env_id, suffix)"); + return NULL; + } + VecEnv *vec = (VecEnv *)PyLong_AsVoidPtr(PyTuple_GetItem(args, 0)); + if (!vec) { + PyErr_SetString(PyExc_ValueError, "Invalid vec_env handle"); + return NULL; + } + int env_id = (int)PyLong_AsLong(PyTuple_GetItem(args, 1)); + const char *suffix = PyUnicode_AsUTF8(PyTuple_GetItem(args, 2)); + if (!suffix) { + PyErr_SetString(PyExc_TypeError, "suffix must be a string"); + return NULL; + } + if (env_id < 0 || env_id >= vec->num_envs) { + PyErr_SetString(PyExc_IndexError, "env_id out of range"); + return NULL; + } + Drive *env = (Drive *)vec->envs[env_id]; + strncpy(env->video_suffix, suffix, sizeof(env->video_suffix) - 1); + env->video_suffix[sizeof(env->video_suffix) - 1] = '\0'; + Py_RETURN_NONE; +} + static PyObject *vec_render(PyObject *self, PyObject *args) { int num_args = PyTuple_Size(args); - if (num_args != 2) { - PyErr_SetString(PyExc_TypeError, "vec_render requires 2 arguments"); + if (num_args != 4) { + PyErr_SetString(PyExc_TypeError, "vec_render requires 4 arguments"); return NULL; } @@ -542,14 +593,24 @@ static PyObject *vec_render(PyObject *self, PyObject *args) { return NULL; } - PyObject *env_id_arg = PyTuple_GetItem(args, 1); - if (!PyObject_TypeCheck(env_id_arg, &PyLong_Type)) { + if (!PyObject_TypeCheck(PyTuple_GetItem(args, 1), &PyLong_Type)) { + PyErr_SetString(PyExc_TypeError, "view_mode must be an integer"); + return NULL; + } + if (!PyObject_TypeCheck(PyTuple_GetItem(args, 2), &PyBool_Type)) { + PyErr_SetString(PyExc_TypeError, "draw_traces must be a boolean"); + return NULL; + } + if (!PyObject_TypeCheck(PyTuple_GetItem(args, 3), &PyLong_Type)) { PyErr_SetString(PyExc_TypeError, "env_id must be an integer"); return NULL; } - int env_id = PyLong_AsLong(env_id_arg); - c_render(vec->envs[env_id]); + int view_mode = PyLong_AsLong(PyTuple_GetItem(args, 1)); + bool draw_traces = PyObject_IsTrue(PyTuple_GetItem(args, 2)); + int env_id = PyLong_AsLong(PyTuple_GetItem(args, 3)); + + c_render(vec->envs[env_id], view_mode, draw_traces); Py_RETURN_NONE; } @@ -629,6 +690,60 @@ static PyObject *vec_log(PyObject *self, PyObject *args) { return dict; } +static PyObject *env_log(PyObject *self, PyObject *args) { + int num_args = PyTuple_Size(args); + if (num_args != 2) { + PyErr_SetString(PyExc_TypeError, "env_log requires 2 arguments"); + return NULL; + } + + Env *env = unpack_env(args); + if (!env) { + return NULL; + } + + PyObject *num_agents_arg = PyTuple_GetItem(args, 1); + float num_agents = (float)PyLong_AsLong(num_agents_arg); + + // Aggregate this env's per-agent logs (same as vec_log but for one env) + // Note: breaks horribly if you don't use floats + Log aggregate = {0}; + int num_keys = sizeof(Log) / sizeof(float); + for (int j = 0; j < num_keys; j++) { + ((float *)&aggregate)[j] += ((float *)&env->log)[j]; + } + + PyObject *dict = PyDict_New(); + + // Mirror vec_log: only report when enough data has accumulated + if (aggregate.n < num_agents) { + return dict; + } + + // Reset log now that we've consumed it, mirroring vec_log + for (int j = 0; j < num_keys; j++) { + ((float *)&env->log)[j] = 0.0f; + } + + // Average across agents in env + float n = aggregate.n; + for (int i = 0; i < num_keys; i++) { + ((float *)&aggregate)[i] /= n; + } + + // Compute completion_rate from raw counts (mirrors vec_log) + if (aggregate.goals_attempted_this_episode > 0.0f) { + aggregate.completion_rate = aggregate.goals_reached_this_episode / aggregate.goals_attempted_this_episode; + } else { + aggregate.completion_rate = 0.0f; + } + + my_log(dict, &aggregate); + assign_to_dict(dict, "n", n); + + return dict; +} + static PyObject *vec_close(PyObject *self, PyObject *args) { VecEnv *vec = unpack_vecenv(args); if (!vec) { @@ -684,6 +799,20 @@ static PyObject *get_global_agent_state(PyObject *self, PyObject *args) { Py_RETURN_NONE; } + +static PyObject *vec_get_scenario_ids(PyObject *self, PyObject *args) { + VecEnv *vec = unpack_vecenv(args); + if (!vec) + return NULL; + + PyObject *list = PyList_New(vec->num_envs); + for (int i = 0; i < vec->num_envs; i++) { + // scenario_id is char[16], may not be null-terminated at byte 16 + PyList_SET_ITEM(list, i, PyUnicode_FromStringAndSize(vec->envs[i]->scenario_id, 16)); + } + return list; +} + static PyObject *vec_get_global_agent_state(PyObject *self, PyObject *args) { if (PyTuple_Size(args) != 8) { PyErr_SetString(PyExc_TypeError, "vec_get_global_agent_state requires 8 arguments"); @@ -989,7 +1118,9 @@ static PyMethodDef methods[] = { {"vec_step", vec_step, METH_VARARGS, "Step the vector of environments"}, {"vec_log", vec_log, METH_VARARGS, "Log the vector of environments"}, {"vec_render", vec_render, METH_VARARGS, "Render the vector of environments"}, + {"vec_set_video_suffix", vec_set_video_suffix, METH_VARARGS, "Set mp4 filename suffix for the given env"}, {"vec_close", vec_close, METH_VARARGS, "Close the vector of environments"}, + {"vec_get_scenario_ids", vec_get_scenario_ids, METH_VARARGS, "Get scenario IDs for all envs"}, {"shared", (PyCFunction)my_shared, METH_VARARGS | METH_KEYWORDS, "Shared state"}, {"get_global_agent_state", get_global_agent_state, METH_VARARGS, "Get global agent state"}, {"vec_get_global_agent_state", vec_get_global_agent_state, METH_VARARGS, "Get agent state from vectorized env"}, @@ -1000,6 +1131,7 @@ static PyMethodDef methods[] = { "Get road edge polyline counts from vectorized env"}, {"vec_get_road_edge_polylines", vec_get_road_edge_polylines, METH_VARARGS, "Get road edge polylines from vectorized env"}, + {"env_log", env_log, METH_VARARGS, "Log a single environment"}, MY_METHODS, {NULL, NULL, 0, NULL}}; diff --git a/pufferlib/ocean/env_config.h b/pufferlib/ocean/env_config.h index 8105a40e9..8aebe15ea 100644 --- a/pufferlib/ocean/env_config.h +++ b/pufferlib/ocean/env_config.h @@ -8,6 +8,7 @@ // Config struct for parsing INI files - contains all environment configuration typedef struct { + int render_mode; int action_type; int dynamics_model; float reward_vehicle_collision; @@ -110,6 +111,8 @@ static int handler(void *config, const char *section, const char *name, const ch printf("Warning: Unknown dynamics_model value '%s', defaulting to JERK\n", value); env_config->dynamics_model = 1; // Default to JERK } + } else if (MATCH("env", "render_mode")) { + env_config->render_mode = atoi(value); } else if (MATCH("env", "goal_behavior")) { env_config->goal_behavior = atoi(value); } else if (MATCH("env", "reward_randomization")) { diff --git a/pufferlib/pufferl.py b/pufferlib/pufferl.py index 424baf4f6..e9d84b41f 100644 --- a/pufferlib/pufferl.py +++ b/pufferlib/pufferl.py @@ -15,6 +15,7 @@ import random import shutil import subprocess +import tempfile import argparse import importlib import configparser @@ -37,6 +38,9 @@ import pufferlib.pytorch import pufferlib.utils +from pufferlib.ocean.benchmark.evaluator import Evaluator + + try: from pufferlib import _C except ImportError: @@ -55,7 +59,6 @@ import signal # Aggressively exit on ctrl+c import multiprocessing -import queue signal.signal(signal.SIGINT, lambda sig, frame: os._exit(0)) @@ -64,7 +67,8 @@ class PuffeRL: - def __init__(self, config, vecenv, policy, logger=None): + def __init__(self, config, vecenv, policy, logger=None, full_args=None): + self.full_args = full_args # Backend perf optimization torch.set_float32_matmul_precision("high") torch.backends.cudnn.deterministic = config["torch_deterministic"] @@ -124,17 +128,8 @@ def __init__(self, config, vecenv, policy, logger=None): self.ep_lengths = torch.zeros(total_agents, device=device, dtype=torch.int32) self.ep_indices = torch.arange(total_agents, device=device, dtype=torch.int32) self.free_idx = total_agents - self.render = config["render"] - self.render_async = config["render_async"] - self.render_interval = config["render_interval"] - - safe_eval_renders = config.get("safe_eval", {}).get("enabled", False) - if self.render or safe_eval_renders: - ensure_drive_binary() - - if self.render_async: - self.render_queue = multiprocessing.Queue() - self.render_processes = [] + # Use a safe default when eval settings are absent + self.eval_interval = config.get("eval", {}).get("eval_interval", 0) # LSTM if config["use_rnn"]: @@ -203,9 +198,6 @@ def __init__(self, config, vecenv, policy, logger=None): self.logger = logger if logger is None: self.logger = NoLogger(config) - if self.render_async and hasattr(self.logger, "wandb") and self.logger.wandb: - self.logger.wandb.define_metric("render_step", hidden=True) - self.logger.wandb.define_metric("render/*", step_metric="render_step") # Learning rate scheduler epochs = config["total_timesteps"] // config["batch_size"] @@ -516,60 +508,34 @@ def train(self): self.save_checkpoint() self.msg = f"Checkpoint saved at update {self.epoch}" - if self.render and self.epoch % self.render_interval == 0: - model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - - if model_files: - # Take the latest checkpoint - latest_cpt = max(model_files, key=os.path.getctime) - bin_path = f"{model_dir}.bin" - - # Export to .bin for rendering with raylib - try: - export_args = {"env_name": self.config["env"], "load_model_path": latest_cpt, **self.config} - - export( - args=export_args, - env_name=self.config["env"], - vecenv=self.vecenv, - policy=self.uncompiled_policy, - path=bin_path, - silent=True, - ) - - bin_path_epoch = f"{model_dir}_epoch_{self.epoch:06d}.bin" - shutil.copy2(bin_path, bin_path_epoch) - - driver_env = getattr(self.vecenv, "driver_env", None) - render_ini = pufferlib.utils.generate_env_ini( - self.config.get("env_config", {}), prefix="render_" - ) - self._render_videos( - bin_path=bin_path_epoch, - num_maps=getattr(driver_env, "num_maps", None), - map_dir=getattr(driver_env, "map_dir", None), - wandb_prefix="render", - config_path=render_ini, - cleanup_files=[bin_path_epoch, bin_path, render_ini], - ) - - except Exception as e: - print(f"Failed to export model weights: {e}") - - if self.config["eval"]["wosac_realism_eval"] and ( - (self.epoch - 1) % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_wosac_eval_in_subprocess(self.config, self.logger, self.global_step) - - if self.config["eval"]["human_replay_eval"] and ( - (self.epoch - 1) % self.config["eval"]["eval_interval"] == 0 or done_training - ): - pufferlib.utils.run_human_replay_eval_in_subprocess(self.config, self.logger, self.global_step) + if self.epoch % self.config["eval"]["eval_interval"] == 0 or done_training: + human_replay_eval = self.config["eval"]["human_replay_eval"] + self_play_eval = self.config["eval"]["self_play_eval"] + + self.evaluator = Evaluator(self.full_args, self.logger) + if human_replay_eval: + self.evaluator.hr_env = load_env(self.config["env"], self.evaluator.hr_eval_config) + self.evaluator.rollout(self.uncompiled_policy, mode="human_replay") + self.evaluator.hr_env.close() + self.evaluator.log_videos(eval_mode="human_replay", epoch=self.epoch) + if self_play_eval: + self.evaluator.sp_env = load_env(self.config["env"], self.evaluator.sp_eval_config) + self.evaluator.rollout(self.uncompiled_policy, mode="self_play") + self.evaluator.sp_env.close() + self.evaluator.log_videos(eval_mode="self_play", epoch=self.epoch) + if human_replay_eval or self_play_eval: + self.evaluator.log_stats() + + del self.evaluator + + if self.config["eval"]["wosac_realism_eval"] and ( + (self.epoch) % self.config["eval"]["eval_interval"] == 0 or done_training + ): + pufferlib.utils.run_wosac_eval_in_subprocess(self.config, self.logger, self.global_step) safe_eval_config = self.config.get("safe_eval", {}) safe_eval_enabled = safe_eval_config.get("enabled", False) - safe_eval_interval = int(safe_eval_config.get("interval", self.render_interval)) + safe_eval_interval = int(safe_eval_config.get("interval", self.eval_interval)) is_main = not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0 if ( is_main @@ -579,77 +545,21 @@ def train(self): ): self._run_safe_eval() - def _render_videos( - self, - bin_path, - num_maps=None, - map_dir=None, - wandb_prefix="render", - config_path=None, - cleanup_files=None, - ): - """Render videos, either async (background process) or sync (blocking).""" - wandb_log = hasattr(self.logger, "wandb") and self.logger.wandb is not None - render_kwargs = dict( - config=self.config, - run_id=self.logger.run_id, - num_maps=num_maps, - map_dir=map_dir, - wandb_log=wandb_log, - epoch=self.epoch, - global_step=self.global_step, - bin_path=bin_path, - config_path=config_path, - wandb_prefix=wandb_prefix, - ) - - if self.render_async: - self._reap_render_processes() - max_processes = 3 - if len(self.render_processes) >= max_processes: - print(f"Waiting for render processes to finish ({len(self.render_processes)}/{max_processes})...") - while len(self.render_processes) >= max_processes: - time.sleep(1) - self._reap_render_processes() - - render_proc = multiprocessing.Process( - target=pufferlib.utils.render_videos_and_cleanup, - kwargs={ - **render_kwargs, - "cleanup_files": cleanup_files or [], - "render_async": True, - "render_queue": self.render_queue, - }, - daemon=True, - ) - render_proc.start() - self.render_processes.append(render_proc) - else: - pufferlib.utils.render_videos( - **render_kwargs, - render_async=False, - wandb_run=self.logger.wandb if wandb_log else None, - ) - for f in cleanup_files or []: - if os.path.exists(f): - os.remove(f) - def _run_safe_eval(self): - """Run safe eval in-process using SafeEvaluator, then render videos.""" + """Run safe eval in-process using SafeEvaluator.""" import copy import traceback vecenv = None - bin_path = None - safe_ini_path = None - render_handed_off = False try: from pufferlib.ocean.benchmark.evaluator import SafeEvaluator self.msg = "Running safe eval..." env_name = self.config["env"] safe_eval_config = self.config.get("safe_eval", {}) - evaluator = SafeEvaluator(env_name, safe_eval_config, device=self.config["device"], logger=self.logger) + evaluator = SafeEvaluator( + env_name, safe_eval_config, device=self.config["device"], logger=self.logger, full_config=self.config + ) eval_config = evaluator._build_eval_env_config() vecenv = load_env(env_name, eval_config) @@ -664,34 +574,12 @@ def _run_safe_eval(self): metrics = evaluator.evaluate(vecenv, policy) evaluator.log_stats(global_step=self.global_step) - self.msg = f"Safe eval: {len(metrics)} metrics logged" - - self.msg = "Spawning safe eval render..." - model_dir = os.path.join(self.config["data_dir"], f"{self.config['env']}_{self.logger.run_id}") - bin_path = f"{model_dir}_safe_eval_epoch_{self.epoch:06d}.bin" - - export( - args={"env_name": env_name, "load_model_path": "unused", **self.config}, - env_name=env_name, - vecenv=self.vecenv, - policy=self.uncompiled_policy, - path=bin_path, - silent=True, - ) - - safe_ini_path = pufferlib.utils.generate_safe_eval_ini(safe_eval_config) - - self._render_videos( - bin_path=bin_path, - num_maps=safe_eval_config.get("num_maps"), - map_dir=safe_eval_config.get("map_dir"), - wandb_prefix="eval", - config_path=safe_ini_path, - cleanup_files=[bin_path, safe_ini_path], - ) - render_handed_off = True - self.msg = f"Safe eval complete: {len(metrics)} metrics logged" + if evaluator.render_safe_eval: + self.msg = "Rendering safe eval..." + evaluator.render(eval_config, policy) + evaluator.log_videos(epoch=self.epoch) + self.msg = f"Safe eval: {len(metrics)} metrics logged" except Exception as e: self.msg = f"Safe eval failed: {e}" traceback.print_exc(file=sys.stderr) @@ -701,64 +589,8 @@ def _run_safe_eval(self): vecenv.close() except Exception: pass - if not render_handed_off: - for f in [bin_path, safe_ini_path]: - if f and os.path.exists(f): - try: - os.remove(f) - except OSError: - pass - - def _reap_render_processes(self): - """Remove finished render processes from the tracking list.""" - if not self.render_async: - return - alive = [] - for p in self.render_processes: - if p.is_alive(): - alive.append(p) - else: - p.join(timeout=1) - self.render_processes = alive - - def check_render_queue(self): - """Check if any async render jobs finished and log them.""" - if not self.render_async: - return - self._reap_render_processes() - - try: - while True: - try: - result = self.render_queue.get_nowait() - except queue.Empty: - break - - step = result["step"] - videos = result["videos"] - prefix = result.get("prefix", "render") - - if hasattr(self.logger, "wandb") and self.logger.wandb: - import wandb - - payload = {} - if videos["output_topdown"]: - payload[f"{prefix}/world_state"] = [ - wandb.Video(p, format="mp4") for p in videos["output_topdown"] - ] - if videos["output_agent"]: - payload[f"{prefix}/agent_view"] = [wandb.Video(p, format="mp4") for p in videos["output_agent"]] - - if payload: - payload["train_step"] = step - self.logger.wandb.log(payload) - except Exception as e: - print(f"Error reading render queue: {e}") def mean_and_log(self): - # Check render queue for finished async jobs - self.check_render_queue() - config = self.config for k in list(self.stats.keys()): v = self.stats[k] @@ -795,22 +627,6 @@ def close(self): self.vecenv.close() self.utilization.stop() - if self.render_async: - # Drain any remaining async render results before closing - self.check_render_queue() - for p in self.render_processes: - try: - if p.is_alive(): - p.terminate() - p.join(timeout=5) - if p.is_alive(): - p.kill() - except Exception: - print(f"Failed to terminate render process {p.pid}") - self.render_processes = [] - self.render_queue.close() - self.render_queue.join_thread() - model_path = self.save_checkpoint() run_id = self.logger.run_id project_name = "puffer_drive" @@ -1274,7 +1090,7 @@ def train(env_name, args=None, vecenv=None, policy=None, logger=None): ) if "vec" in args and "num_workers" in args["vec"]: train_config["num_workers"] = args["vec"]["num_workers"] - pufferl = PuffeRL(train_config, vecenv, policy, logger) + pufferl = PuffeRL(train_config, vecenv, policy, logger, full_args=args) all_logs = [] while pufferl.global_step < train_config["total_timesteps"]: @@ -1365,38 +1181,6 @@ def eval(env_name, args=None, vecenv=None, policy=None): vecenv.close() return results_dict - elif human_replay_enabled: - args["env"]["map_dir"] = args["eval"]["map_dir"] - dataset_name = args["env"]["map_dir"].split("/")[-1] - print(f"Running human replay evaluation with {dataset_name} dataset.\n") - from pufferlib.ocean.benchmark.evaluator import HumanReplayEvaluator - - backend = args["eval"].get("backend", "PufferEnv") - args["env"]["map_dir"] = args["eval"]["map_dir"] - args["env"]["num_agents"] = args["eval"]["human_replay_num_agents"] - - args["vec"] = dict(backend=backend, num_envs=1) - args["env"]["control_mode"] = args["eval"]["human_replay_control_mode"] - args["env"]["episode_length"] = 91 # WOMD scenario length - - vecenv = vecenv or load_env(env_name, args) - policy = policy or load_policy(args, vecenv, env_name) - - print(f"Effective number of scenarios used: {len(vecenv.driver_env.agent_offsets) - 1}") - - evaluator = HumanReplayEvaluator(args) - - # Run rollouts with human replays - results = evaluator.rollout(args, vecenv, policy) - - import json - - print("HUMAN_REPLAY_METRICS_START") - print(json.dumps(results)) - print("HUMAN_REPLAY_METRICS_END") - - return results - else: # Standard evaluation: Render backend = args["vec"]["backend"] if backend != "PufferEnv": @@ -1418,23 +1202,12 @@ def eval(env_name, args=None, vecenv=None, policy=None): lstm_c=torch.zeros(num_agents, policy.hidden_size, device=device), ) - frames = [] + if driver.render_mode == 1: + max_frames = 91 + frame_count = 0 + while True: - render = driver.render() - if len(frames) < args["save_frames"]: - frames.append(render) - - # Screenshot Ocean envs with F12, gifs with control + F12 - if driver.render_mode == "ansi": - print("\033[0;0H" + render + "\n") - time.sleep(1 / args["fps"]) - elif driver.render_mode == "rgb_array": - pass - # import cv2 - # render = cv2.cvtColor(render, cv2.COLOR_RGB2BGR) - # cv2.imshow('frame', render) - # cv2.waitKey(1) - # time.sleep(1/args['fps']) + driver.render() with torch.no_grad(): ob = torch.as_tensor(ob).to(device) @@ -1445,13 +1218,14 @@ def eval(env_name, args=None, vecenv=None, policy=None): if isinstance(logits, torch.distributions.Normal): action = np.clip(action, vecenv.action_space.low, vecenv.action_space.high) - ob = vecenv.step(action)[0] + ob, reward, done, truncated, info = vecenv.step(action) - if len(frames) > 0 and len(frames) == args["save_frames"]: - import imageio + if driver.render_mode == 1: + frame_count += 1 + if frame_count >= max_frames or done.all() or truncated.all(): + break - imageio.mimsave(args["gif_path"], frames, fps=args["fps"], loop=0) - frames.append("Done") + vecenv.close() def sweep(args=None, env_name=None): @@ -1641,27 +1415,6 @@ def export(args=None, env_name=None, vecenv=None, policy=None, path=None, silent print(f"Saved {len(weights)} weights to {path}") -def ensure_drive_binary(): - """Delete existing visualize binary and rebuild it. This ensures the - binary is always up-to-date with the latest code changes. - """ - if os.path.exists("./visualize"): - os.remove("./visualize") - - try: - result = subprocess.run( - ["bash", "scripts/build_ocean.sh", "visualize", "fast"], capture_output=True, text=True, timeout=300 - ) - - if result.returncode != 0: - print(f"Build failed: {result.stderr}") - raise RuntimeError("Failed to build visualize binary for rendering") - except subprocess.TimeoutExpired: - raise RuntimeError("Build timed out") - except Exception as e: - raise RuntimeError(f"Build error: {e}") - - def autotune(args=None, env_name=None, vecenv=None, policy=None): package = args["package"] module_name = "pufferlib.ocean" if package == "ocean" else f"pufferlib.environments.{package}" @@ -1731,6 +1484,7 @@ def load_config(env_name, config_dir=None): add_help=False, ) parser.add_argument("--load-model-path", type=str, default=None, help="Path to a pretrained checkpoint") + parser.add_argument("--export-path", type=str, default=None, help="Output path for puffer export (.bin file)") parser.add_argument( "--load-id", type=str, default=None, help="Kickstart/eval from from a finished Wandb/Neptune run" ) @@ -1806,85 +1560,158 @@ def puffer_type(value): def render(env_name, args=None): + """Render rollouts for a batch of maps using the in-process c_render pipeline. + + Each map is loaded as a separate environment with render_mode=1 (headless + ffmpeg). A policy rollout is run for max_frames steps, producing one + {scenario_id}.mp4 per env in the current working directory. The files are + then moved into output_dir. + """ + from pufferlib.ocean.drive.drive import RenderView + args = args or load_config(env_name) render_configs = args.get("render", {}) - # Renders first num_maps from map_dir using visualize binary try: map_dir = render_configs["map_dir"] num_maps = render_configs.get("num_maps", 1) - view_mode = render_configs["view_mode"] - render_policy_path = render_configs["policy_path"] - overwork = render_configs.get("overwork", False) - num_workers = args["vec"]["num_workers"] + view_mode_str = str(render_configs.get("view_mode", "sim_state")).lower().strip('"').strip("'") + draw_traces = render_configs.get("draw_traces", True) + max_frames = render_configs.get("max_frames", 91) output_dir = render_configs["output_dir"] + # Allow [render] to override [env] init/control modes — important for logged-trajectory + # maps (e.g. carla_2D) which need create_all_valid + control_vehicles, not the training + # init_variable_agent_number mode that spawns agents on road lanes. + render_init_mode = ( + str(render_configs["init_mode"]).strip('"').strip("'") if "init_mode" in render_configs else None + ) + render_control_mode = ( + str(render_configs["control_mode"]).strip('"').strip("'") if "control_mode" in render_configs else None + ) except KeyError as e: raise pufferlib.APIUsageError(f"Missing render config: {e}") - cpu_cores = psutil.cpu_count(logical=False) - if num_workers > cpu_cores and not overwork: + # Map config string → list of RenderView enums to render + # "both" = sim_state + persp, "all" = sim_state + persp + bev + _VIEW_MODE_MAP = { + "sim_state": [RenderView.FULL_SIM_STATE], + "topdown": [RenderView.FULL_SIM_STATE], # backward compat + "bev": [RenderView.BEV_AGENT_OBS], + "agent": [RenderView.BEV_AGENT_OBS], # backward compat + "persp": [RenderView.AGENT_PERSP], + "both": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP], + "all": [RenderView.FULL_SIM_STATE, RenderView.AGENT_PERSP, RenderView.BEV_AGENT_OBS], + } + view_modes = _VIEW_MODE_MAP.get(view_mode_str) + if view_modes is None: raise pufferlib.APIUsageError( - " ".join( - [ - f"num_workers ({num_workers}) > hardware cores ({cpu_cores}) is disallowed by default.", - "PufferLib multiprocessing is heavily optimized for 1 process per hardware core.", - "If you really want to do this, set overwork=True (--vec-overwork in our demo.py).", - ] - ) + f"Unknown view_mode '{view_mode_str}'. Choose from: sim_state, bev, persp, both, all" ) - if num_maps > len(os.listdir(map_dir)): - num_maps = len(os.listdir(map_dir)) + bin_files = sorted(f for f in os.listdir(map_dir) if f.endswith(".bin")) + if num_maps > len(bin_files): + num_maps = len(bin_files) + render_maps = [os.path.join(map_dir, f) for f in bin_files[:num_maps]] - render_maps = [os.path.join(map_dir, f) for f in sorted(os.listdir(map_dir)) if f.endswith(".bin")][:num_maps] os.makedirs(output_dir, exist_ok=True) - # Rebuild visualize binary - ensure_drive_binary() + # Fall back to CPU if the configured device is unavailable (e.g. no CUDA on this machine) + configured_device = args["train"]["device"] + if configured_device == "cuda" and not torch.cuda.is_available(): + print("Warning: CUDA not available, falling back to CPU for render.") + configured_device = "cpu" + args["train"]["device"] = configured_device + device = configured_device + + def render_one_map(map_path): + """Render a single map file in one or more view modes, moving resulting mp4(s) to output_dir. + + Each view mode runs a separate rollout via the shared + :func:`pufferlib.ocean.drive.rollout.rollout_loop` helper, so that each + gets its own ffmpeg pipe and output file. Output files are named + ``{scenario_id}_{view}.mp4`` for multi-view runs or ``{scenario_id}.mp4`` + for single-view runs — the C binding writes the correct name directly + via ``set_video_suffix``, no post-hoc renaming required. + """ + from pufferlib.ocean.drive.rollout import RenderContext, rollout_loop + + map_name = os.path.splitext(os.path.basename(map_path))[0] + + _VIEW_SUFFIX = { + RenderView.FULL_SIM_STATE: "sim_state", + RenderView.AGENT_PERSP: "persp", + RenderView.BEV_AGENT_OBS: "bev", + } + multi = len(view_modes) > 1 + + for view_mode in view_modes: + with tempfile.TemporaryDirectory() as tmp_map_dir: + tmp_bin = os.path.join(tmp_map_dir, os.path.basename(map_path)) + shutil.copy2(map_path, tmp_bin) + + env_overrides = { + **args["env"], + "num_maps": 1, + "map_dir": tmp_map_dir, + "render_mode": 1, # headless ffmpeg → writes {scenario_id}[suffix].mp4 in cwd + } + if render_init_mode is not None: + env_overrides["init_mode"] = render_init_mode + if render_control_mode is not None: + env_overrides["control_mode"] = render_control_mode + + map_args = { + **args, + "env": env_overrides, + "vec": {"backend": "Serial", "num_envs": 1}, + } + + env = load_env(env_name, map_args) + policy = load_policy(map_args, env, env_name) + policy.eval() + + # Clean up any stale mp4 from a previous run and snapshot cwd so + # we can find the new file(s) created during this rollout. + suffix = f"_{_VIEW_SUFFIX[view_mode]}" if multi else "" + stale = os.path.join(os.getcwd(), f"{map_name}{suffix}.mp4") + if os.path.exists(stale): + os.remove(stale) + before = set(glob.glob(os.path.join(os.getcwd(), "*.mp4"))) + + rollout_loop( + policy=policy, + env=env, + device=device, + use_rnn=map_args["train"]["use_rnn"], + max_steps=max_frames, + render_ctx=RenderContext( + view_mode=view_mode, + env_id=0, + draw_traces=draw_traces, + video_suffix=suffix, + ), + ) - def render_task(map_path): - base_cmd = ( - ["./visualize"] - if sys.platform == "darwin" - else ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] - ) - cmd = base_cmd.copy() - cmd.extend(["--map-name", map_path]) - if render_configs.get("show_grid", False): - cmd.append("--show-grid") - if render_configs.get("obs_only", False): - cmd.append("--obs-only") - if render_configs.get("show_lasers", False): - cmd.append("--lasers") - if render_configs.get("show_human_logs", False): - cmd.append("--show-human-logs") - if render_configs.get("zoom_in", False): - cmd.append("--zoom-in") - cmd.extend(["--view", view_mode]) - if render_policy_path is not None: - cmd.extend(["--policy-name", render_policy_path]) - - map_name = os.path.basename(map_path).replace(".bin", "") - - if view_mode == "topdown" or view_mode == "both": - cmd.extend(["--output-topdown", os.path.join(output_dir, f"topdown_{map_name}.mp4")]) - if view_mode == "agent" or view_mode == "both": - cmd.extend(["--output-agent", os.path.join(output_dir, f"agent_{map_name}.mp4")]) - - env_vars = os.environ.copy() - env_vars["ASAN_OPTIONS"] = "exitcode=0" - try: - result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=600, env=env_vars) - if result.returncode != 0: - print(f"Error rendering {map_name}: {result.stderr}") - except subprocess.TimeoutExpired: - print(f"Timeout rendering {map_name}: exceeded 600 seconds") + env.close() + + # Move the newly produced mp4(s) to output_dir. With + # set_video_suffix, filenames are correct by construction — no + # post-hoc renaming needed. + after = set(glob.glob(os.path.join(os.getcwd(), "*.mp4"))) + new_mp4s = after - before + if new_mp4s: + for src in sorted(new_mp4s): + dst = os.path.join(output_dir, os.path.basename(src)) + shutil.move(src, dst) + print(f" Saved {dst}") + else: + print(f" Warning: no mp4 produced for map {map_name} view {_VIEW_SUFFIX[view_mode]}") if render_maps: - print(f"Rendering {len(render_maps)} from {map_dir} with {num_workers} workers...") - with ThreadPool(num_workers) as pool: - pool.map(render_task, render_maps) - print(f"Finished rendering videos to {output_dir}") + print(f"Rendering {len(render_maps)} map(s) from {map_dir} → {output_dir} ...") + for map_path in render_maps: + render_one_map(map_path) + print(f"Done. Videos written to {output_dir}") def main(): @@ -1907,7 +1734,8 @@ def main(): elif mode == "profile": profile(env_name=env_name) elif mode == "export": - export(env_name=env_name) + args = load_config(env_name) + export(env_name=env_name, args=args, path=args.get("export_path")) elif mode == "sanity": sanity(env_name=env_name) elif mode == "render": diff --git a/pufferlib/utils.py b/pufferlib/utils.py index 9eda5827e..9c6c5185c 100644 --- a/pufferlib/utils.py +++ b/pufferlib/utils.py @@ -62,73 +62,6 @@ def generate_safe_eval_ini(safe_eval_config, base_ini_path="pufferlib/config/oce return tmp_path -def run_human_replay_eval_in_subprocess(config, logger, global_step): - """ - Run human replay evaluation in a subprocess and log metrics to wandb. - - """ - try: - run_id = logger.run_id - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - model_files = glob.glob(os.path.join(model_dir, "model_*.pt")) - - if not model_files: - print("No model files found for human replay evaluation") - return - - latest_cpt = max(model_files, key=os.path.getctime) - - # Prepare evaluation command - eval_config = config["eval"] - cmd = [ - sys.executable, - "-m", - "pufferlib.pufferl", - "eval", - config["env"], - "--load-model-path", - latest_cpt, - "--eval.wosac-realism-eval", - "False", - "--eval.human-replay-eval", - "True", - "--eval.human-replay-num-agents", - str(eval_config["human_replay_num_agents"]), - "--eval.human-replay-control-mode", - str(eval_config["human_replay_control_mode"]), - ] - - # Run human replay evaluation in subprocess - result = subprocess.run(cmd, capture_output=True, text=True, timeout=600, cwd=os.getcwd()) - - if result.returncode == 0: - # Extract JSON from stdout between markers - stdout = result.stdout - if "HUMAN_REPLAY_METRICS_START" in stdout and "HUMAN_REPLAY_METRICS_END" in stdout: - start = stdout.find("HUMAN_REPLAY_METRICS_START") + len("HUMAN_REPLAY_METRICS_START") - end = stdout.find("HUMAN_REPLAY_METRICS_END") - json_str = stdout[start:end].strip() - human_replay_metrics = json.loads(json_str) - - # Log to wandb if available - if hasattr(logger, "wandb") and logger.wandb: - logger.wandb.log( - { - "eval/human_replay_collision_rate": human_replay_metrics["collision_rate"], - "eval/human_replay_offroad_rate": human_replay_metrics["offroad_rate"], - "eval/human_replay_completion_rate": human_replay_metrics["completion_rate"], - }, - step=global_step, - ) - else: - print(f"Human replay evaluation failed with exit code {result.returncode}: {result.stderr}") - - except subprocess.TimeoutExpired: - print("Human replay evaluation timed out") - except Exception as e: - print(f"Failed to run human replay evaluation: {e}") - - def run_wosac_eval_in_subprocess(config, logger, global_step): """ Run WOSAC evaluation in a subprocess and log metrics to wandb. @@ -225,178 +158,3 @@ def run_wosac_eval_in_subprocess(config, logger, global_step): print(f"WOSAC evaluation ran out of memory. Skipping this evaluation: {e}") except Exception as e: print(f"Failed to run WOSAC evaluation: {type(e).__name__}: {e}") - - -def render_videos( - config, - run_id, - wandb_log, - epoch, - global_step, - bin_path, - render_async, - render_queue=None, - wandb_run=None, - config_path=None, - wandb_prefix="render", - num_maps=None, - map_dir=None, -): - """Generate and log training videos using C-based rendering.""" - if not os.path.exists(bin_path): - print(f"Binary weights file does not exist: {bin_path}") - return - - model_dir = os.path.join(config["data_dir"], f"{config['env']}_{run_id}") - - # Now call the C rendering function - try: - # Create output directory for videos - video_output_dir = os.path.join(model_dir, "videos") - os.makedirs(video_output_dir, exist_ok=True) - - # TODO: Fix memory leaks so that this is not needed - # Suppress AddressSanitizer exit code (temp) - env_vars = os.environ.copy() - env_vars["ASAN_OPTIONS"] = "exitcode=0" - - # Base command with only visualization flags (env config comes from INI) - base_cmd = ["xvfb-run", "-a", "-s", "-screen 0 1280x720x24", "./visualize"] - - if config_path: - base_cmd.extend(["--config", config_path]) - - # Visualization config flags only - if config.get("show_grid", False): - base_cmd.append("--show-grid") - if config.get("obs_only", False): - base_cmd.append("--obs-only") - if config.get("show_lasers", False): - base_cmd.append("--lasers") - if config.get("show_human_logs", False): - base_cmd.append("--log-trajectories") - if config.get("zoom_in", False): - base_cmd.append("--zoom-in") - - # Frame skip for rendering performance - frame_skip = config.get("frame_skip", 1) - if frame_skip > 1: - base_cmd.extend(["--frame-skip", str(frame_skip)]) - - # View mode - view_mode = config.get("view_mode", "both") - base_cmd.extend(["--view", view_mode]) - - if num_maps: - base_cmd.extend(["--num-maps", str(num_maps)]) - - base_cmd.extend(["--policy-name", bin_path]) - - # Handle single or multiple map rendering - render_maps = config.get("render_map", None) - if render_maps is None or render_maps == "none": - pass # use map_dir passed as parameter - if map_dir and os.path.isdir(map_dir): - bin_files = [f for f in os.listdir(map_dir) if f.endswith(".bin")] - if bin_files: - render_maps = [os.path.join(map_dir, random.choice(bin_files))] - else: - print(f"Warning: No .bin files found in {map_dir}, skipping render") - return - else: - print(f"Warning: map_dir not found or invalid ({map_dir}), skipping render") - return - elif isinstance(render_maps, (str, os.PathLike)): - render_maps = [render_maps] - else: - render_maps = list(render_maps) - - generated_videos = {"output_topdown": [], "output_agent": []} - output_topdown = f"resources/drive/{wandb_prefix}_output_topdown_{epoch}" - output_agent = f"resources/drive/{wandb_prefix}_output_agent_{epoch}" - - for i, map_path in enumerate(render_maps): - cmd = list(base_cmd) # copy - if os.path.exists(map_path): - cmd.extend(["--map-name", str(map_path)]) - - output_topdown_map = output_topdown + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") - output_agent_map = output_agent + (f"_map{i:02d}.mp4" if len(render_maps) > 1 else ".mp4") - - cmd.extend(["--output-topdown", output_topdown_map]) - cmd.extend(["--output-agent", output_agent_map]) - - print(f"Running render: {' '.join(cmd[:6])}...") - result = subprocess.run(cmd, cwd=os.getcwd(), capture_output=True, text=True, timeout=1200, env=env_vars) - - vids_exist = os.path.exists(output_topdown_map) and os.path.exists(output_agent_map) - print(f"Render exit code: {result.returncode}, vids_exist: {vids_exist}") - if result.returncode != 0 and result.stderr: - print(f"Render stderr: {result.stderr[-500:]}") - - if result.returncode == 0 or (result.returncode == 1 and vids_exist): - videos = [ - ("output_topdown", output_topdown_map, f"epoch_{epoch:06d}_map{i:02d}_topdown.mp4"), - ("output_agent", output_agent_map, f"epoch_{epoch:06d}_map{i:02d}_agent.mp4"), - ] - - for vid_type, source_vid, target_filename in videos: - if os.path.exists(source_vid): - target_path = os.path.join(video_output_dir, target_filename) - shutil.move(source_vid, target_path) - generated_videos[vid_type].append(target_path) - else: - print(f"Video generation completed but {source_vid} not found") - if result.stdout: - print(f"StdOUT: {result.stdout}") - if result.stderr: - print(f"StdERR: {result.stderr}") - else: - print(f"C rendering failed (map index {i}) with exit code {result.returncode}: {result.stderr}") - - if render_async: - render_queue.put( - { - "videos": generated_videos, - "step": global_step, - "prefix": wandb_prefix, - } - ) - elif wandb_log and wandb_run: - import wandb - - payload = {} - if generated_videos["output_topdown"]: - payload[f"{wandb_prefix}/world_state"] = [ - wandb.Video(p, format="mp4") for p in generated_videos["output_topdown"] - ] - if generated_videos["output_agent"]: - payload[f"{wandb_prefix}/agent_view"] = [ - wandb.Video(p, format="mp4") for p in generated_videos["output_agent"] - ] - if payload: - print(f"Logging {len(payload)} video keys to wandb: {list(payload.keys())}") - payload["train_step"] = global_step - wandb_run.log(payload) - - except subprocess.TimeoutExpired: - print("C rendering timed out") - except Exception as e: - print(f"Failed to render videos: {e}") - - -def render_videos_and_cleanup(cleanup_files=None, **render_kwargs): - """Wrapper that runs render_videos then cleans up temp files. - - Intended as the target for multiprocessing.Process so that temp files - (bin weights, generated INI) are cleaned up inside the spawned process. - """ - try: - render_videos(**render_kwargs) - finally: - for f in cleanup_files or []: - try: - if os.path.exists(f): - os.remove(f) - except OSError: - pass