Action Disruptor#
- Action-disruptor:
The real action \(a_t\) chosen by the agent may be altered before or during execution in the environment due to implementation inaccuracies or system malfunctions. The action-disruptor models this perturbation, outputting a perturbed action \(\tilde{a}_t = D_a(a_t)\), which is then executed in the environment for the next step.
def step(self, robust_input):
action = robust_input["action"]
args = robust_input["robust_config"]
mu = args.noise_mu
sigma = args.noise_sigma
if args.noise_factor == "action":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
action = action + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
action = action + args.noise_shift
elif args.noise_type =="uniform":
action = action + random.uniform(args.uniform_low, args.uniform_high)
else:
action = action
else:
action = action
xy_position_before = self.get_body_com("torso")[:2].copy()
self.do_simulation(action, self.frame_skip)
xy_position_after = self.get_body_com("torso")[:2].copy()
xy_velocity = (xy_position_after - xy_position_before) / self.dt
x_velocity, y_velocity = xy_velocity
forward_reward = x_velocity
healthy_reward = self.healthy_reward
rewards = forward_reward + healthy_reward
costs = ctrl_cost = self.control_cost(action)
terminated = self.terminated
if args.noise_factor == "state":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
observation = self._get_obs() + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
observation = self._get_obs() + args.noise_shift
elif args.noise_type =="uniform":
observation = observation + random.uniform(args.uniform_low, args.uniform_high)
else:
observation = self._get_obs()
else:
observation = self._get_obs()
fullpath_original = self.expand_model_path(self.xml_file_original)
info = {
"reward_forward": forward_reward,
"reward_ctrl": -ctrl_cost,
"reward_survive": healthy_reward,
"x_position": xy_position_after[0],
"y_position": xy_position_after[1],
"distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
"x_velocity": x_velocity,
"y_velocity": y_velocity,
"forward_reward": forward_reward,
"source_file_path": fullpath_original,
"target_file_path": self.fullpath,
}
if self._use_contact_forces:
contact_cost = self.contact_cost
costs += contact_cost
info["reward_ctrl"] = -contact_cost
reward = rewards - costs
if self.render_mode == "human":
self.render()
self.previous_reward = reward
return observation, reward, terminated, False, info
def _get_obs(self):
return self.simulation_state()
def reset_model(self):
self.previous_reward = 0
return self._get_obs()