diff --git a/trainer/cli.py b/trainer/cli.py index b9f05fc..2d6afc7 100644 --- a/trainer/cli.py +++ b/trainer/cli.py @@ -59,9 +59,13 @@ def __post_init__(self, motion, log_level, log_file): def _load_robot(self): self._robot = simulation.reset(self._scene, self.robot) - def train(self, output, chunk_length=3, num_chunk=50, **kwargs): - trained = trainer.train(self._scene, self._motion, self._robot, - chunk_length, num_chunk, **kwargs) + def train(self, output, **kwargs): + def make_scene(_): + gui_client = BulletClient(connection_mode=pybullet.DIRECT) + scene = Scene(self.timestep, self.frame_skip, client=gui_client) + robot = simulation.reset(scene, self.robot) + return scene, robot + trained = trainer.train(self._motion, make_scene, **kwargs) trained.dump(output) def preview(self): diff --git a/trainer/evaluation.py b/trainer/evaluation.py index 9714340..960ab7d 100644 --- a/trainer/evaluation.py +++ b/trainer/evaluation.py @@ -7,7 +7,7 @@ from .simulation import apply_joints -def calc_effector_reward(motion, robot, frame, ke, wl, wr): +def calc_effector_reward(motion, robot, frame, *, ke, wl, wr): diff = 0 for name, effector in frame.effectors.items(): pose = robot.link_state(name).pose @@ -26,7 +26,7 @@ def calc_effector_reward(motion, robot, frame, ke, wl, wr): return - math.exp(normalized) + 1 -def calc_stabilization_reward(frame, pre_positions, ks): +def calc_stabilization_reward(frame, pre_positions, *, ks): if pre_positions is None: return 0 @@ -35,11 +35,11 @@ def calc_stabilization_reward(frame, pre_positions, ks): return - math.exp(normalized) + 1 -def calc_reward(motion, robot, frame, pre_positions, we=1, ws=0.1, ke=1, ks=1, wl=1, wr=0.005): +def calc_reward(motion, robot, frame, pre_positions, *, we=1, ws=0.1, ke=1, ks=1, wl=1, wr=0.005): # TODO: Use more clear naming of hyperparameters - e = calc_effector_reward(motion, robot, frame, ke, wl, wr) - s = calc_stabilization_reward(frame, pre_positions, ks) + e = calc_effector_reward(motion, robot, frame, ke=ke, wl=wl, wr=wr) + s = calc_stabilization_reward(frame, pre_positions, ks=ks) return e * we + s * ws diff --git a/trainer/train.py b/trainer/train.py index f01b0ec..9f42b78 100644 --- a/trainer/train.py +++ b/trainer/train.py @@ -1,7 +1,10 @@ import numpy as np -from typing import Dict +from typing import Dict, Optional, List, Callable, Tuple import dataclasses from logging import getLogger +import math +from concurrent import futures +import threading from nevergrad.optimization import optimizerlib from nevergrad.instrumentation import InstrumentedFunction @@ -39,47 +42,86 @@ def save(scene: Scene, robot: Robot): return StateWithJoints(scene.save_state(), torques) -def train_chunk(scene: Scene, motion: flom.Motion, robot: Robot, start: float, init_weights: np.ndarray, init_state: StateWithJoints, algorithm: str = 'OnePlusOne', num_iteration: int = 1000, weight_factor: float = 0.01, stddev: float = 1, **kwargs): +@dataclasses.dataclass +class Env: + scene: Scene + robot: Robot + + state: Optional[StateWithJoints] = None + + def __post_init__(self): + self.save() + + def save(self): + self.state = StateWithJoints.save(self.scene, self.robot) + + def restore(self): + self.state.restore(self.scene, self.robot) + + +def train_chunk(motion: flom.Motion, envs: List[Env], start: float, init_weights: np.ndarray, *, algorithm: str = 'OnePlusOne', num_iteration: int = 1000, weight_factor: float = 0.01, stddev: float = 1, **kwargs): weight_shape = np.array(init_weights).shape - def step(weights): - init_state.restore(scene, robot) + thread_envs = {} # type: Dict[int, Env] + + def step(weights, env: Env = None): + env = env or thread_envs[threading.get_ident()] + + env.restore() reward_sum = 0 - start_ts = scene.ts + start_ts = env.scene.ts - pre_positions = try_get_pre_positions(scene, motion, start=start) + pre_positions = try_get_pre_positions(env.scene, motion, start=start) for init_weight, frame_weight in zip(init_weights, weights): - frame = motion.frame_at(start + scene.ts - start_ts) + frame = motion.frame_at(start + env.scene.ts - start_ts) frame.positions = apply_weights( - frame.positions, (init_weight + frame_weight) * weight_factor) - apply_joints(robot, frame.positions) + frame.positions, init_weight + frame_weight * weight_factor) + apply_joints(env.robot, frame.positions) - scene.step() + env.scene.step() - reward_sum += calc_reward(motion, robot, frame, pre_positions, **kwargs) + reward_sum += calc_reward(motion, env.robot, frame, pre_positions, **kwargs) pre_positions = frame.positions return -reward_sum + used_idx = 0 + lock = threading.Lock() + def register_thread(): + nonlocal used_idx, thread_envs + with lock: + thread_envs[threading.get_ident()] = envs[used_idx] + used_idx += 1 + weights_param = Gaussian(mean=0, std=stddev, shape=weight_shape) inst_step = InstrumentedFunction(step, weights_param) optimizer = optimizerlib.registry[algorithm]( - dimension=inst_step.dimension, budget=num_iteration, num_workers=1) - recommendation = optimizer.optimize(inst_step) + dimension=inst_step.dimension, budget=num_iteration, num_workers=len(envs)) + + with futures.ThreadPoolExecutor(max_workers=optimizer.num_workers, initializer=register_thread) as executor: + recommendation = optimizer.optimize(inst_step, executor=executor) weights = np.reshape(recommendation, weight_shape) - reward = step(weights) + for e in envs: + reward = step(weights, e) + e.save() + + return reward, weights * weight_factor + + +def train(motion: flom.Motion, make_scene: Callable[[int], Tuple[Scene, Robot]], *, num_workers: int = 5, chunk_length: int = 3, num_chunk: Optional[int] = None, **kwargs): + envs = [Env(*make_scene(i)) for i in range(num_workers)] + first_env = envs[0] - state = StateWithJoints.save(scene, robot) - return reward, weights, state + chunk_duration = first_env.scene.dt * chunk_length + if num_chunk is None: + num_chunk = math.ceil(motion.length() / chunk_duration) -def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.01, **kwargs): - chunk_duration = scene.dt * chunk_length total_length = chunk_duration * num_chunk log.info(f"chunk duration: {chunk_duration} s") log.info(f"motion length: {motion.length()} s") @@ -88,13 +130,12 @@ def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.0 if total_length < motion.length(): log.warning(f"A total length to train is shorter than the length of motion") - num_frames = int(motion.length() / scene.dt) + num_frames = int(motion.length() / first_env.scene.dt) num_joints = len(list(motion.joint_names())) # TODO: Call len() directly weights = np.zeros(shape=(num_frames, num_joints)) log.info(f"shape of weights: {weights.shape}") log.debug(f"kwargs: {kwargs}") - last_state = StateWithJoints.save(scene, robot) for chunk_idx in range(num_chunk): start = chunk_idx * chunk_duration start_idx = chunk_idx * chunk_length % num_frames @@ -102,8 +143,7 @@ def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.0 r = range(start_idx, start_idx + chunk_length) in_weights = [weights[i % num_frames] for i in r] log.info(f"start training chunk {chunk_idx} ({start}~)") - reward, out_weights, last_state = train_chunk( - scene, motion, robot, start, in_weights, last_state, weight_factor=weight_factor, **kwargs) + reward, out_weights = train_chunk(motion, envs, start, in_weights, **kwargs) for i, w in zip(r, out_weights): weights[i % num_frames] = w @@ -117,8 +157,8 @@ def train(scene, motion, robot, chunk_length=3, num_chunk=100, weight_factor=0.0 new_motion.set_effector_weight(name, motion.effector_weight(name)) for i, frame_weight in enumerate(weights): - t = i * scene.dt + t = i * first_env.scene.dt new_frame = motion.frame_at(t) - new_frame.positions = apply_weights(new_frame.positions, frame_weight * weight_factor) + new_frame.positions = apply_weights(new_frame.positions, frame_weight) new_motion.insert_keyframe(t, new_frame) return new_motion