From e99a224d865d3432c666da44a8717dd07ad7184c Mon Sep 17 00:00:00 2001 From: gqt <3217233537@qq.com> Date: Sun, 26 Apr 2026 18:35:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96PPO=E8=87=AA=E9=80=82?= =?UTF-8?q?=E5=BA=94=E5=9B=9E=E5=85=85=E4=B8=8E=E6=B3=9B=E5=8C=96=E7=89=B9?= =?UTF-8?q?=E5=BE=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- agent_ppo/feature/preprocessor.py | 117 +++++++++++++++++++++++---- agent_ppo/workflow/train_workflow.py | 2 +- 2 files changed, 101 insertions(+), 18 deletions(-) diff --git a/agent_ppo/feature/preprocessor.py b/agent_ppo/feature/preprocessor.py index 7121dde..7e70ee5 100644 --- a/agent_ppo/feature/preprocessor.py +++ b/agent_ppo/feature/preprocessor.py @@ -58,6 +58,7 @@ class Preprocessor: (1, 1), ) INF_DIST = 1e6 + ABS_POS_FEATURE_SCALE = 0.2 def __init__(self): self.reset() @@ -128,6 +129,10 @@ class Preprocessor: self.charger_energy_cost = float(self.GRID_SIZE) self.charger_safety_buffer = 0.0 self.charger_safety_margin = 0.0 + self.recharge_enter_margin = 0.0 + self.recharge_leave_margin = 0.0 + self.recharge_low_battery_ratio = 0.35 + self.full_charge_leave_ratio = 0.96 self.battery_margin = 0.0 self.has_charger = False self.low_battery = False @@ -388,21 +393,33 @@ class Preprocessor: def _update_recharge_mode(self): """Enter/exit low-battery recharge mode.""" battery_ratio = self.battery / max(self.battery_max, 1) - self.low_battery = battery_ratio < 0.35 if not self.has_charger: self.recharge_mode = False self.charger_safety_margin = float(self.battery) + self.recharge_enter_margin = 0.0 + self.recharge_leave_margin = 0.0 + self.recharge_low_battery_ratio = 0.35 + self.full_charge_leave_ratio = 0.96 + self.low_battery = battery_ratio < self.recharge_low_battery_ratio return self.charger_energy_cost = float(max(self.nearest_charger_path_dist, 0.0)) self.charger_safety_buffer = self._charger_safety_buffer() self.charger_safety_margin = float(self.battery) - self.charger_energy_cost - self.charger_safety_buffer + self.recharge_enter_margin = self._recharge_enter_margin() + self.recharge_leave_margin = self._recharge_leave_margin() + self.recharge_low_battery_ratio = self._recharge_low_battery_ratio() + self.full_charge_leave_ratio = self._full_charge_leave_ratio() - should_recharge = self.charger_safety_margin <= 0.0 or battery_ratio < 0.28 - safe_to_leave = self.charger_safety_margin > 18.0 and battery_ratio > 0.65 + self.low_battery = battery_ratio < self.recharge_low_battery_ratio + should_recharge = self.charger_safety_margin <= self.recharge_enter_margin or self.low_battery + safe_to_leave = ( + battery_ratio >= self.full_charge_leave_ratio + and self.charger_safety_margin >= self.recharge_leave_margin + ) - if self.on_charger and (battery_ratio > 0.85 or safe_to_leave): + if self.on_charger and safe_to_leave: self.recharge_mode = False elif should_recharge: self.recharge_mode = True @@ -461,6 +478,66 @@ class Preprocessor: obstacle_buffer = 18.0 * float(self.local_obstacle_ratio) return float(np.clip(base + distance_buffer + obstacle_buffer, 24.0, 64.0)) + def _recharge_enter_margin(self): + """Adaptive margin for entering recharge mode before the battery is barely enough.""" + base = max(8.0, 0.025 * float(self.battery_max)) + path_margin = min(18.0, 0.12 * float(max(self.nearest_charger_path_dist, 0.0))) + obstacle_margin = 20.0 * float(self.local_obstacle_ratio) + recovery_margin = min(10.0, 2.0 * float(self.recharge_no_progress_steps + self.fake_charger_steps)) + return float(np.clip(base + path_margin + obstacle_margin + recovery_margin, 8.0, 48.0)) + + def _recharge_leave_margin(self): + """Adaptive safety margin required before leaving a charger.""" + base = max(28.0, 0.10 * float(self.battery_max)) + path_margin = min(24.0, 0.18 * float(max(self.nearest_charger_path_dist, 0.0))) + obstacle_margin = 16.0 * float(self.local_obstacle_ratio) + return float(np.clip(base + path_margin + obstacle_margin, 28.0, 88.0)) + + def _recharge_low_battery_ratio(self): + """Adaptive low-battery ratio based on route length and local obstacle density.""" + path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0) + ratio = 0.32 + min(0.10, 0.55 * path_pressure) + min(0.06, 0.20 * float(self.local_obstacle_ratio)) + if self.recharge_no_progress_steps > 0 or self.fake_charger_steps > 0: + ratio += 0.03 + return float(np.clip(ratio, 0.32, 0.48)) + + def _full_charge_leave_ratio(self): + """Adaptive near-full threshold for leaving a charger.""" + remaining_step_ratio = 1.0 - _norm(self.step_no, self.max_step) + path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0) + ratio = 0.94 + 0.03 * remaining_step_ratio + min(0.02, 0.10 * path_pressure) + ratio += min(0.01, 0.05 * float(self.local_obstacle_ratio)) + return float(np.clip(ratio, 0.94, 0.985)) + + def _recharge_risk_score(self): + """Risk score in [0, 1] used to scale recharge rewards and penalties.""" + if not self.has_charger: + return 0.0 + battery_ratio = self.battery / max(self.battery_max, 1) + margin_deficit = max(0.0, self.recharge_enter_margin - self.charger_safety_margin) + margin_risk = margin_deficit / max(self.charger_safety_buffer + self.recharge_enter_margin, 1.0) + low_battery_risk = max(0.0, self.recharge_low_battery_ratio - battery_ratio) + low_battery_risk /= max(self.recharge_low_battery_ratio, 1e-6) + progress_risk = min(1.0, float(self.recharge_no_progress_steps) / 5.0) + return float(np.clip(0.55 * margin_risk + 0.35 * low_battery_risk + 0.10 * progress_risk, 0.0, 1.0)) + + def useful_charge_reward_weight(self): + """Adaptive reward weight for charging that happens under real battery pressure.""" + prev_battery_ratio = self.prev_battery / max(self.prev_battery_max, 1) + prev_low_risk = max(0.0, self.recharge_low_battery_ratio - prev_battery_ratio) + prev_low_risk /= max(self.recharge_low_battery_ratio, 1e-6) + risk = max(self._recharge_risk_score(), prev_low_risk) + mode_bonus = 0.25 if self.was_recharge_mode or self.prev_low_battery else 0.0 + return float(np.clip(1.2 + 1.1 * risk + mode_bonus, 1.2, 2.6)) + + def battery_fail_penalty(self): + """Adaptive terminal penalty for running out of battery before max steps.""" + step_ratio = _norm(self.step_no, self.max_step) + early_fail_risk = 1.0 - step_ratio + path_pressure = float(max(self.charger_energy_cost, 0.0)) / max(float(self.battery_max), 1.0) + risk = max(self._recharge_risk_score(), min(1.0, path_pressure)) + return float(np.clip(5.5 + 2.5 * early_fail_risk + 1.0 * risk, 5.5, 9.0)) + def _min_charger_range_dist(self, x, z): if not self.charger_rects: return float(self.GRID_SIZE) @@ -490,8 +567,8 @@ class Preprocessor: [1] battery_ratio battery level / 电量比 [0,1] [2] cleaning_progress cleaned ratio / 已清扫比例 [0,1] [3] remaining_dirt remaining dirt ratio / 剩余污渍比例 [0,1] - [4] pos_x_norm x position / x 坐标归一化 [0,1] - [5] pos_z_norm z position / z 坐标归一化 [0,1] + [4] pos_x_weak weak x position / 弱化后的 x 坐标 [0.4,0.6] + [5] pos_z_weak weak z position / 弱化后的 z 坐标 [0.4,0.6] [6] ray_N_dirt north ray distance / 向上(z-)方向最近污渍距离 [7] ray_E_dirt east ray distance / 向右(x+)方向 [8] ray_S_dirt south ray distance / 向下(z+)方向 @@ -521,8 +598,8 @@ class Preprocessor: remaining_dirt = 1.0 - cleaning_progress hx, hz = self.cur_pos - pos_x_norm = _norm(hx, self.GRID_SIZE) - pos_z_norm = _norm(hz, self.GRID_SIZE) + pos_x_weak = self._weak_abs_position_feature(hx) + pos_z_weak = self._weak_abs_position_feature(hz) # 4-directional ray to find nearest dirt # 四方向射线找最近污渍距离 @@ -560,8 +637,8 @@ class Preprocessor: battery_ratio, cleaning_progress, remaining_dirt, - pos_x_norm, - pos_z_norm, + pos_x_weak, + pos_z_weak, ray_dirt[0], ray_dirt[1], ray_dirt[2], @@ -588,6 +665,10 @@ class Preprocessor: dtype=np.float32, ) + def _weak_abs_position_feature(self, value): + pos_norm = _norm(value, self.GRID_SIZE) + return 0.5 + self.ABS_POS_FEATURE_SCALE * (pos_norm - 0.5) + def _calc_nearest_dirt_dist(self): """Find nearest dirt path distance from local view. @@ -668,7 +749,7 @@ class Preprocessor: if self.recharge_mode: legal = self._filter_recharge_actions(legal) legal = self._filter_recharge_escape_actions(legal, safe_legal) - elif self.on_charger and self.battery / max(self.battery_max, 1) > 0.65: + elif self.on_charger and self.battery / max(self.battery_max, 1) >= self.full_charge_leave_ratio: legal = self._filter_leave_charger_actions(legal) return list(legal) @@ -886,7 +967,7 @@ class Preprocessor: self.prev_low_battery or self.was_recharge_mode or prev_battery_ratio < 0.45 ) if useful_charge: - charge_reward += 1.0 + charge_reward += self.useful_charge_reward_weight() elif self.charge_delta > 0 and battery_ratio > 0.65: charge_reward -= 0.25 * min(self.charge_delta, 3) @@ -894,11 +975,13 @@ class Preprocessor: dist_delta = float( np.clip(self.last_nearest_charger_path_dist - self.nearest_charger_path_dist, -4.0, 4.0) ) - approach_scale = 0.06 if self.charger_safety_margin <= 0 else 0.04 - retreat_scale = 0.03 if self.charger_safety_margin <= 0 else 0.02 + recharge_risk = self._recharge_risk_score() + approach_scale = 0.04 + 0.04 * recharge_risk + retreat_scale = 0.02 + 0.03 * recharge_risk charge_reward += approach_scale * dist_delta if dist_delta > 0 else retreat_scale * dist_delta - if self.charger_safety_margin < 0: - charge_reward -= min(0.35, abs(self.charger_safety_margin) / max(self.battery_max, 1)) + if self.charger_safety_margin < self.recharge_enter_margin: + safety_shortage = self.recharge_enter_margin - self.charger_safety_margin + charge_reward -= min(0.55, safety_shortage / max(self.battery_max, 1)) elif self.on_charger and battery_ratio > 0.65: charge_reward -= 0.08 @@ -926,7 +1009,7 @@ class Preprocessor: terminal_penalty = 0.0 if self.terminated and not self.truncated: if self.battery <= 0 or self.remaining_charge <= 0: - terminal_penalty -= 4.0 + terminal_penalty -= self.battery_fail_penalty() elif self.npc_danger or self.nearest_npc_dist <= 1: terminal_penalty -= 3.0 diff --git a/agent_ppo/workflow/train_workflow.py b/agent_ppo/workflow/train_workflow.py index 14a73aa..648fee1 100644 --- a/agent_ppo/workflow/train_workflow.py +++ b/agent_ppo/workflow/train_workflow.py @@ -158,7 +158,7 @@ class EpisodeRunner: result_str = "WIN" else: if fm.battery <= 0 or remaining_charge <= 0: - final_reward = -4.0 + 6.0 * cleaning_ratio + final_reward = -fm.battery_fail_penalty() + 4.0 * cleaning_ratio result_str = "BATTERY_FAIL" elif fm.npc_danger or fm.nearest_npc_dist <= 1: final_reward = -3.0 + 6.0 * cleaning_ratio