优化PPO自适应回充与泛化特征
This commit is contained in:
@@ -58,6 +58,7 @@ class Preprocessor:
|
||||
(1, 1),
|
||||
)
|
||||
INF_DIST = 1e6
|
||||
ABS_POS_FEATURE_SCALE = 0.2
|
||||
|
||||
def __init__(self):
|
||||
self.reset()
|
||||
@@ -128,6 +129,10 @@ class Preprocessor:
|
||||
self.charger_energy_cost = float(self.GRID_SIZE)
|
||||
self.charger_safety_buffer = 0.0
|
||||
self.charger_safety_margin = 0.0
|
||||
self.recharge_enter_margin = 0.0
|
||||
self.recharge_leave_margin = 0.0
|
||||
self.recharge_low_battery_ratio = 0.35
|
||||
self.full_charge_leave_ratio = 0.96
|
||||
self.battery_margin = 0.0
|
||||
self.has_charger = False
|
||||
self.low_battery = False
|
||||
@@ -388,21 +393,33 @@ class Preprocessor:
|
||||
def _update_recharge_mode(self):
|
||||
"""Enter/exit low-battery recharge mode."""
|
||||
battery_ratio = self.battery / max(self.battery_max, 1)
|
||||
self.low_battery = battery_ratio < 0.35
|
||||
|
||||
if not self.has_charger:
|
||||
self.recharge_mode = False
|
||||
self.charger_safety_margin = float(self.battery)
|
||||
self.recharge_enter_margin = 0.0
|
||||
self.recharge_leave_margin = 0.0
|
||||
self.recharge_low_battery_ratio = 0.35
|
||||
self.full_charge_leave_ratio = 0.96
|
||||
self.low_battery = battery_ratio < self.recharge_low_battery_ratio
|
||||
return
|
||||
|
||||
self.charger_energy_cost = float(max(self.nearest_charger_path_dist, 0.0))
|
||||
self.charger_safety_buffer = self._charger_safety_buffer()
|
||||
self.charger_safety_margin = float(self.battery) - self.charger_energy_cost - self.charger_safety_buffer
|
||||
self.recharge_enter_margin = self._recharge_enter_margin()
|
||||
self.recharge_leave_margin = self._recharge_leave_margin()
|
||||
self.recharge_low_battery_ratio = self._recharge_low_battery_ratio()
|
||||
self.full_charge_leave_ratio = self._full_charge_leave_ratio()
|
||||
|
||||
should_recharge = self.charger_safety_margin <= 0.0 or battery_ratio < 0.28
|
||||
safe_to_leave = self.charger_safety_margin > 18.0 and battery_ratio > 0.65
|
||||
self.low_battery = battery_ratio < self.recharge_low_battery_ratio
|
||||
should_recharge = self.charger_safety_margin <= self.recharge_enter_margin or self.low_battery
|
||||
safe_to_leave = (
|
||||
battery_ratio >= self.full_charge_leave_ratio
|
||||
and self.charger_safety_margin >= self.recharge_leave_margin
|
||||
)
|
||||
|
||||
if self.on_charger and (battery_ratio > 0.85 or safe_to_leave):
|
||||
if self.on_charger and safe_to_leave:
|
||||
self.recharge_mode = False
|
||||
elif should_recharge:
|
||||
self.recharge_mode = True
|
||||
@@ -461,6 +478,66 @@ class Preprocessor:
|
||||
obstacle_buffer = 18.0 * float(self.local_obstacle_ratio)
|
||||
return float(np.clip(base + distance_buffer + obstacle_buffer, 24.0, 64.0))
|
||||
|
||||
def _recharge_enter_margin(self):
|
||||
"""Adaptive margin for entering recharge mode before the battery is barely enough."""
|
||||
base = max(8.0, 0.025 * float(self.battery_max))
|
||||
path_margin = min(18.0, 0.12 * float(max(self.nearest_charger_path_dist, 0.0)))
|
||||
obstacle_margin = 20.0 * float(self.local_obstacle_ratio)
|
||||
recovery_margin = min(10.0, 2.0 * float(self.recharge_no_progress_steps + self.fake_charger_steps))
|
||||
return float(np.clip(base + path_margin + obstacle_margin + recovery_margin, 8.0, 48.0))
|
||||
|
||||
def _recharge_leave_margin(self):
|
||||
"""Adaptive safety margin required before leaving a charger."""
|
||||
base = max(28.0, 0.10 * float(self.battery_max))
|
||||
path_margin = min(24.0, 0.18 * float(max(self.nearest_charger_path_dist, 0.0)))
|
||||
obstacle_margin = 16.0 * float(self.local_obstacle_ratio)
|
||||
return float(np.clip(base + path_margin + obstacle_margin, 28.0, 88.0))
|
||||
|
||||
def _recharge_low_battery_ratio(self):
|
||||
"""Adaptive low-battery ratio based on route length and local obstacle density."""
|
||||
path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0)
|
||||
ratio = 0.32 + min(0.10, 0.55 * path_pressure) + min(0.06, 0.20 * float(self.local_obstacle_ratio))
|
||||
if self.recharge_no_progress_steps > 0 or self.fake_charger_steps > 0:
|
||||
ratio += 0.03
|
||||
return float(np.clip(ratio, 0.32, 0.48))
|
||||
|
||||
def _full_charge_leave_ratio(self):
|
||||
"""Adaptive near-full threshold for leaving a charger."""
|
||||
remaining_step_ratio = 1.0 - _norm(self.step_no, self.max_step)
|
||||
path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0)
|
||||
ratio = 0.94 + 0.03 * remaining_step_ratio + min(0.02, 0.10 * path_pressure)
|
||||
ratio += min(0.01, 0.05 * float(self.local_obstacle_ratio))
|
||||
return float(np.clip(ratio, 0.94, 0.985))
|
||||
|
||||
def _recharge_risk_score(self):
|
||||
"""Risk score in [0, 1] used to scale recharge rewards and penalties."""
|
||||
if not self.has_charger:
|
||||
return 0.0
|
||||
battery_ratio = self.battery / max(self.battery_max, 1)
|
||||
margin_deficit = max(0.0, self.recharge_enter_margin - self.charger_safety_margin)
|
||||
margin_risk = margin_deficit / max(self.charger_safety_buffer + self.recharge_enter_margin, 1.0)
|
||||
low_battery_risk = max(0.0, self.recharge_low_battery_ratio - battery_ratio)
|
||||
low_battery_risk /= max(self.recharge_low_battery_ratio, 1e-6)
|
||||
progress_risk = min(1.0, float(self.recharge_no_progress_steps) / 5.0)
|
||||
return float(np.clip(0.55 * margin_risk + 0.35 * low_battery_risk + 0.10 * progress_risk, 0.0, 1.0))
|
||||
|
||||
def useful_charge_reward_weight(self):
|
||||
"""Adaptive reward weight for charging that happens under real battery pressure."""
|
||||
prev_battery_ratio = self.prev_battery / max(self.prev_battery_max, 1)
|
||||
prev_low_risk = max(0.0, self.recharge_low_battery_ratio - prev_battery_ratio)
|
||||
prev_low_risk /= max(self.recharge_low_battery_ratio, 1e-6)
|
||||
risk = max(self._recharge_risk_score(), prev_low_risk)
|
||||
mode_bonus = 0.25 if self.was_recharge_mode or self.prev_low_battery else 0.0
|
||||
return float(np.clip(1.2 + 1.1 * risk + mode_bonus, 1.2, 2.6))
|
||||
|
||||
def battery_fail_penalty(self):
|
||||
"""Adaptive terminal penalty for running out of battery before max steps."""
|
||||
step_ratio = _norm(self.step_no, self.max_step)
|
||||
early_fail_risk = 1.0 - step_ratio
|
||||
path_pressure = float(max(self.charger_energy_cost, 0.0)) / max(float(self.battery_max), 1.0)
|
||||
risk = max(self._recharge_risk_score(), min(1.0, path_pressure))
|
||||
return float(np.clip(5.5 + 2.5 * early_fail_risk + 1.0 * risk, 5.5, 9.0))
|
||||
|
||||
def _min_charger_range_dist(self, x, z):
|
||||
if not self.charger_rects:
|
||||
return float(self.GRID_SIZE)
|
||||
@@ -490,8 +567,8 @@ class Preprocessor:
|
||||
[1] battery_ratio battery level / 电量比 [0,1]
|
||||
[2] cleaning_progress cleaned ratio / 已清扫比例 [0,1]
|
||||
[3] remaining_dirt remaining dirt ratio / 剩余污渍比例 [0,1]
|
||||
[4] pos_x_norm x position / x 坐标归一化 [0,1]
|
||||
[5] pos_z_norm z position / z 坐标归一化 [0,1]
|
||||
[4] pos_x_weak weak x position / 弱化后的 x 坐标 [0.4,0.6]
|
||||
[5] pos_z_weak weak z position / 弱化后的 z 坐标 [0.4,0.6]
|
||||
[6] ray_N_dirt north ray distance / 向上(z-)方向最近污渍距离
|
||||
[7] ray_E_dirt east ray distance / 向右(x+)方向
|
||||
[8] ray_S_dirt south ray distance / 向下(z+)方向
|
||||
@@ -521,8 +598,8 @@ class Preprocessor:
|
||||
remaining_dirt = 1.0 - cleaning_progress
|
||||
|
||||
hx, hz = self.cur_pos
|
||||
pos_x_norm = _norm(hx, self.GRID_SIZE)
|
||||
pos_z_norm = _norm(hz, self.GRID_SIZE)
|
||||
pos_x_weak = self._weak_abs_position_feature(hx)
|
||||
pos_z_weak = self._weak_abs_position_feature(hz)
|
||||
|
||||
# 4-directional ray to find nearest dirt
|
||||
# 四方向射线找最近污渍距离
|
||||
@@ -560,8 +637,8 @@ class Preprocessor:
|
||||
battery_ratio,
|
||||
cleaning_progress,
|
||||
remaining_dirt,
|
||||
pos_x_norm,
|
||||
pos_z_norm,
|
||||
pos_x_weak,
|
||||
pos_z_weak,
|
||||
ray_dirt[0],
|
||||
ray_dirt[1],
|
||||
ray_dirt[2],
|
||||
@@ -588,6 +665,10 @@ class Preprocessor:
|
||||
dtype=np.float32,
|
||||
)
|
||||
|
||||
def _weak_abs_position_feature(self, value):
|
||||
pos_norm = _norm(value, self.GRID_SIZE)
|
||||
return 0.5 + self.ABS_POS_FEATURE_SCALE * (pos_norm - 0.5)
|
||||
|
||||
def _calc_nearest_dirt_dist(self):
|
||||
"""Find nearest dirt path distance from local view.
|
||||
|
||||
@@ -668,7 +749,7 @@ class Preprocessor:
|
||||
if self.recharge_mode:
|
||||
legal = self._filter_recharge_actions(legal)
|
||||
legal = self._filter_recharge_escape_actions(legal, safe_legal)
|
||||
elif self.on_charger and self.battery / max(self.battery_max, 1) > 0.65:
|
||||
elif self.on_charger and self.battery / max(self.battery_max, 1) >= self.full_charge_leave_ratio:
|
||||
legal = self._filter_leave_charger_actions(legal)
|
||||
return list(legal)
|
||||
|
||||
@@ -886,7 +967,7 @@ class Preprocessor:
|
||||
self.prev_low_battery or self.was_recharge_mode or prev_battery_ratio < 0.45
|
||||
)
|
||||
if useful_charge:
|
||||
charge_reward += 1.0
|
||||
charge_reward += self.useful_charge_reward_weight()
|
||||
elif self.charge_delta > 0 and battery_ratio > 0.65:
|
||||
charge_reward -= 0.25 * min(self.charge_delta, 3)
|
||||
|
||||
@@ -894,11 +975,13 @@ class Preprocessor:
|
||||
dist_delta = float(
|
||||
np.clip(self.last_nearest_charger_path_dist - self.nearest_charger_path_dist, -4.0, 4.0)
|
||||
)
|
||||
approach_scale = 0.06 if self.charger_safety_margin <= 0 else 0.04
|
||||
retreat_scale = 0.03 if self.charger_safety_margin <= 0 else 0.02
|
||||
recharge_risk = self._recharge_risk_score()
|
||||
approach_scale = 0.04 + 0.04 * recharge_risk
|
||||
retreat_scale = 0.02 + 0.03 * recharge_risk
|
||||
charge_reward += approach_scale * dist_delta if dist_delta > 0 else retreat_scale * dist_delta
|
||||
if self.charger_safety_margin < 0:
|
||||
charge_reward -= min(0.35, abs(self.charger_safety_margin) / max(self.battery_max, 1))
|
||||
if self.charger_safety_margin < self.recharge_enter_margin:
|
||||
safety_shortage = self.recharge_enter_margin - self.charger_safety_margin
|
||||
charge_reward -= min(0.55, safety_shortage / max(self.battery_max, 1))
|
||||
elif self.on_charger and battery_ratio > 0.65:
|
||||
charge_reward -= 0.08
|
||||
|
||||
@@ -926,7 +1009,7 @@ class Preprocessor:
|
||||
terminal_penalty = 0.0
|
||||
if self.terminated and not self.truncated:
|
||||
if self.battery <= 0 or self.remaining_charge <= 0:
|
||||
terminal_penalty -= 4.0
|
||||
terminal_penalty -= self.battery_fail_penalty()
|
||||
elif self.npc_danger or self.nearest_npc_dist <= 1:
|
||||
terminal_penalty -= 3.0
|
||||
|
||||
|
||||
@@ -158,7 +158,7 @@ class EpisodeRunner:
|
||||
result_str = "WIN"
|
||||
else:
|
||||
if fm.battery <= 0 or remaining_charge <= 0:
|
||||
final_reward = -4.0 + 6.0 * cleaning_ratio
|
||||
final_reward = -fm.battery_fail_penalty() + 4.0 * cleaning_ratio
|
||||
result_str = "BATTERY_FAIL"
|
||||
elif fm.npc_danger or fm.nearest_npc_dist <= 1:
|
||||
final_reward = -3.0 + 6.0 * cleaning_ratio
|
||||
|
||||
Reference in New Issue
Block a user