Disruptor Modules#
Before introducing the disruptor module, we recall that the RL problem can be formulated as a process involving several key concepts: an agent, state, action, reward, and an environment. Specifically, at each time \(t\), the environment generates a state \(s_t\) and a reward \(r_t\), and sends them to the agent, and the agent chooses an action \(a_t\) and sends it back to the environment to generate the next state \(s_{t+1}\) conditioned on the current state \(s_t\) and the action \(a_t\).
Considering this, in this benchmark, we consider extensive potential uncertainty/disturbance/generalizable events that happen in this process (including both training and testing phases) during any places, with any modes, and at any time, summarized in the following table.
Perturbation modessources |
Observed state |
Observed reward |
Action |
Environment/task |
|---|---|---|---|---|
Random |
✅ |
✅ |
✅ |
✅ |
Adversarial |
✅ |
✅ |
/ |
/ |
Set arbitrarily |
/ |
/ |
/ |
✅ |
Semantic Domain shift |
/ |
/ |
/ |
✅ |
Those perturbation events can be generally categorized from three different perspectives:
Sources: which component is perturbed/attacked. - Agent’s observed state: The agent observes a noisy/attacked “state” \(\tilde{s_t}\) (diverging from the real state \(s_t\)) and uses it as the input of its policy to determine the action. - Agent’s observed reward: The agent observes a noisy/attacked “reward” \(\tilde{r_t}\) (differing from the real immediate reward \(r_t\) obtained from the environment) and constructs their policy according to it. - Action: The action \(a_t\) chosen by the agent is contaminated before being sent to the environment. Namely, a perturbed action \(\tilde{a_t}\) serves as the input of the environment for the next step. - Environment: An environment includes both immediate reward function \(r\) and dynamic function \(p_t\). An agent may interact with a shifted or nonstationary environment.
Modes: what kind of perturbation is imposed on. - Random: The nominal variable will be added by some random noise following some distributions, such as Gaussian or uniform distribution. This mode can be used to all perturbation sources. - Adversarial: An adversarial attacker will choose the perturbed output within some admissible set to degrade the agent’s performance. This mode can be used for the perturbations towards observation and action. - Set arbitrarily: An environment can be set to any fixed one within some pre-scribed uncertainty set of the environments. - Semantic-domain-shifted: We offer some partially-similar environment/tasks while with some semantic diversity (such as different goals) for domain generalization or transfer learning tasks.
Frequency: when does the perturbation happen. Viewed through the lens of time, the perturbations can happen at different periods during training and testing process, even with different frequency. We provide interactive modes that support step-wise varying interaction between disruptors, agents, and environments. So the user can choose to apply perturbations at any point in the dimension of time in any way.
A Simple Example: Ant Environment#
This module implements the AntEnv class for robust reinforcement learning using the MuJoCo physics engine.
Imports#
import numpy as np
from os import path
from robust_gymnasium import utils
from robust_gymnasium.envs.robust_mujoco import MujocoEnv
from robust_gymnasium.spaces import Box
import xml.etree.ElementTree as ET
import random
from robust_gymnasium.envs.llm_guide_robust.gpt_collect import gpt_call
DEFAULT_CAMERA_CONFIG = {
"distance": 4.0,
}
Classes#
AntEnv#
class AntEnv(MujocoEnv, utils.EzPickle):
metadata = {
"render_modes": [
"human",
"rgb_array",
"depth_array",
],
"render_fps": 20,
}
def __init__(
self,
xml_file="ant.xml",
ctrl_cost_weight=0.5,
use_contact_forces=False,
contact_cost_weight=5e-4,
healthy_reward=1.0,
terminate_when_unhealthy=True,
healthy_z_range=(0.2, 1.0),
contact_force_range=(-1.0, 1.0),
reset_noise_scale=0.1,
exclude_current_positions_from_observation=True,
**kwargs,
):
utils.EzPickle.__init__(
self,
xml_file,
ctrl_cost_weight,
use_contact_forces,
contact_cost_weight,
healthy_reward,
terminate_when_unhealthy,
healthy_z_range,
contact_force_range,
reset_noise_scale,
exclude_current_positions_from_observation,
**kwargs,
)
self._ctrl_cost_weight = ctrl_cost_weight
self._contact_cost_weight = contact_cost_weight
self._healthy_reward = healthy_reward
self._terminate_when_unhealthy = terminate_when_unhealthy
self._healthy_z_range = healthy_z_range
self._contact_force_range = contact_force_range
self._reset_noise_scale = reset_noise_scale
self._use_contact_forces = use_contact_forces
self._exclude_current_positions_from_observation = (
exclude_current_positions_from_observation
)
obs_shape = 27
if not exclude_current_positions_from_observation:
obs_shape += 2
if use_contact_forces:
obs_shape += 84
observation_space = Box(
low=-np.inf, high=np.inf, shape=(obs_shape,), dtype=np.float64
)
MujocoEnv.__init__(
self,
xml_file,
5,
observation_space=observation_space,
default_camera_config=DEFAULT_CAMERA_CONFIG,
**kwargs,
)
self.xml_file = xml_file
self.xml_file_original = "ant_original.xml"
self.previous_reward = 0
self.llm_disturb_iteration = 0
@property
def healthy_reward(self):
return (
float(self.is_healthy or self._terminate_when_unhealthy)
* self._healthy_reward
)
def control_cost(self, action):
control_cost = self._ctrl_cost_weight * np.sum(np.square(action))
return control_cost
@property
def contact_forces(self):
raw_contact_forces = self.data.cfrc_ext
min_value, max_value = self._contact_force_range
contact_forces = np.clip(raw_contact_forces, min_value, max_value)
return contact_forces
@property
def contact_cost(self):
contact_cost = self._contact_cost_weight * np.sum(
np.square(self.contact_forces)
)
return contact_cost
@property
def is_healthy(self):
state = self.state_vector()
min_z, max_z = self._healthy_z_range
is_healthy = np.isfinite(state).all() and min_z <= state[2] <= max_z
return is_healthy
@property
def terminated(self):
terminated = not self.is_healthy if self._terminate_when_unhealthy else False
return terminated
def step(self, robust_input):
action = robust_input["action"]
args = robust_input["robust_config"]
mu = args.noise_mu
sigma = args.noise_sigma
if args.noise_factor == "robust_force":
self.modify_xml(self.fullpath, args)
if args.noise_factor == "robust_shape":
self.modify_xml(self.fullpath, args)
if args.noise_factor == "action":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
action = action + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
action = action + args.noise_shift
elif args.noise_type =="uniform":
action = action + random.uniform(args.uniform_low, args.uniform_high)
else:
action = action
else:
action = action
xy_position_before = self.get_body_com("torso")[:2].copy()
self.do_simulation(action, self.frame_skip)
xy_position_after = self.get_body_com("torso")[:2].copy()
xy_velocity = (xy_position_after - xy_position_before) / self.dt
x_velocity, y_velocity = xy_velocity
forward_reward = x_velocity
healthy_reward = self.healthy_reward
rewards = forward_reward + healthy_reward
costs = ctrl_cost = self.control_cost(action)
terminated = self.terminated
if args.noise_factor == "state":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
observation = self._get_obs() + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
observation = self._get_obs() + args.noise_shift
elif args.noise_type =="uniform":
observation = observation + random.uniform(args.uniform_low, args.uniform_high)
else:
observation = self._get_obs()
else:
observation = self._get_obs()
fullpath_original = self.expand_model_path(self.xml_file_original)
info = {
"reward_forward": forward_reward,
"reward_ctrl": -ctrl_cost,
"reward_survive": healthy_reward,
"x_position": xy_position_after[0],
"y_position": xy_position_after[1],
"distance_from_origin": np.linalg.norm(xy_position_after, ord=2),
"x_velocity": x_velocity,
"y_velocity": y_velocity,
"forward_reward": forward_reward,
"source_file_path": fullpath_original,
"target_file_path": self.fullpath,
}
if self._use_contact_forces:
contact_cost = self.contact_cost
costs += contact_cost
info["reward_ctrl"] = -contact_cost
reward = rewards - costs
if args.noise_factor == "reward":
self.llm_disturb_iteration += 1
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
if args.noise_type == "gauss":
reward = reward + random.gauss(mu, sigma) # robust setting
elif args.noise_type == "shift":
reward = reward + args.noise_shift
elif args.noise_type =="uniform":
reward = reward + random.uniform(args.uniform_low, args.uniform_high)
else:
reward = reward
else:
reward = reward
if self.render_mode == "human":
self.render()
# truncation=False as the time limit is handled by the `TimeLimit` wrapper added during `make`
# self.modify_xml(xml_path_completion(xml_path), str(handle_width_left_door) + ' 0 ' + str(handle_height_door)) # <body name="latch" pos="-0.175 0 -0.025">
if args.noise_factor == "robust_force" or args.noise_factor == "robust_shape":
self.replace_xml_content(fullpath_original, self.fullpath)
if args.llm_guide == "adversary":
self.llm_disturb_iteration += 1
if args.llm_guide_type == "stochastic":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
prompt = "This is about a robust reinforcement learning setting; we want you as an adversary policy. If the current reward exceeds the previous reward value, please input some observation noises to disturb the environment and improve the learning algorithm's robustness. " \
"the current reward:" + str(reward) + ", the previous reward is" + str(self.previous_reward) \
+ "please slightly revise the current environment state values:" + str(
observation) + ", just output the revised state with its original format" \
"do not output any other things."
prompt_state = gpt_call(prompt)
observation = prompt_state
elif args.llm_guide_type == "uniform":
if self.llm_disturb_iteration % args.llm_disturb_interval == 0:
observation = gpt_call("the current observation is"+ str(observation))
self.previous_reward = reward
return observation, reward, terminated, False, info
def expand_model_path(self, file_name):
fullpath = path.join(self.model_dir, file_name)
return fullpath
def modify_xml(self, file_path, args):
tree = ET.parse(file_path)
root = tree.getroot()
if args.noise_factor == "robust_force":
for joint in root.iter("joint"):
joint.set("damping", str(random.uniform(args.damping_low, args.damping_high)))
joint.set("frictionloss", str(random.uniform(args.frictionloss_low, args.frictionloss_high)))
if args.noise_factor == "robust_shape":
for body in root.iter("body"):
body.set("pos", str(
random.uniform(args.pos_low, args.pos_high)) + ' ' + str(
random.uniform(args.pos_low, args.pos_high)) + ' ' + str(
random.uniform(args.pos_low, args.pos_high)))
tree.write(file_path)
def replace_xml_content(self, original_file_path, new_file_path):
tree = ET.parse(original_file_path)
root = tree.getroot()
tree.write(new_file_path)
def _get_obs(self):
return self.simulation_state()
def reset_model(self):
self.previous_reward = 0
return self._get_obs()