diff --git a/agent_ppo/agent.py b/agent_ppo/agent.py index 1dbbde8..0de6e31 100644 --- a/agent_ppo/agent.py +++ b/agent_ppo/agent.py @@ -115,7 +115,11 @@ class Agent(BaseAgent): logits, value = self._run_model(obs_data.feature) legal_arr = np.array(obs_data.legal_action, dtype=np.float32) prob = self._legal_soft_max(logits, legal_arr) - action = self._tie_break_eval_action(prob, legal_arr) + action = None + if hasattr(self.preprocessor, "planned_eval_action"): + action = self.preprocessor.planned_eval_action(prob, legal_arr) + if action is None: + action = self._tie_break_eval_action(prob, legal_arr) act_data = ActData( action=[action], d_action=[action], diff --git a/agent_ppo/feature/preprocessor.py b/agent_ppo/feature/preprocessor.py index b04492e..5fd55ca 100644 --- a/agent_ppo/feature/preprocessor.py +++ b/agent_ppo/feature/preprocessor.py @@ -759,51 +759,51 @@ class Preprocessor: def _charger_safety_buffer(self): # One move roughly costs one charge; reserve extra for detours, local obstacles, and policy noise. - base = max(22.0, 0.14 * float(self.battery_max)) - distance_buffer = min(18.0, 0.20 * float(max(self.nearest_charger_range_dist, 0.0))) - obstacle_buffer = 14.0 * float(self.local_obstacle_ratio) - route_uncertainty_buffer = 10.0 if self.has_charger and not self.charger_route_known else 0.0 - return float(np.clip(base + distance_buffer + obstacle_buffer + route_uncertainty_buffer, 22.0, 58.0)) + base = max(12.0, 0.07 * float(self.battery_max)) + distance_buffer = min(10.0, 0.12 * float(max(self.nearest_charger_range_dist, 0.0))) + obstacle_buffer = 10.0 * float(self.local_obstacle_ratio) + route_uncertainty_buffer = 6.0 if self.has_charger and not self.charger_route_known else 0.0 + return float(np.clip(base + distance_buffer + obstacle_buffer + route_uncertainty_buffer, 12.0, 34.0)) def _recharge_enter_margin(self): """Adaptive margin for entering recharge mode before the battery is barely enough.""" - base = max(7.0, 0.025 * float(self.battery_max)) - path_margin = min(14.0, 0.10 * float(max(self.nearest_charger_path_dist, 0.0))) - obstacle_margin = 14.0 * float(self.local_obstacle_ratio) - route_uncertainty_margin = 8.0 if self.has_charger and not self.charger_route_known else 0.0 - recovery_margin = min(8.0, 1.5 * float(self.recharge_no_progress_steps + self.fake_charger_steps)) + base = max(4.0, 0.018 * float(self.battery_max)) + path_margin = min(8.0, 0.06 * float(max(self.nearest_charger_path_dist, 0.0))) + obstacle_margin = 8.0 * float(self.local_obstacle_ratio) + route_uncertainty_margin = 5.0 if self.has_charger and not self.charger_route_known else 0.0 + recovery_margin = min(6.0, 1.2 * float(self.recharge_no_progress_steps + self.fake_charger_steps)) return float( np.clip( base + path_margin + obstacle_margin + route_uncertainty_margin + recovery_margin, - 6.0, - 42.0, + 4.0, + 26.0, ) ) def _recharge_leave_margin(self): """Adaptive safety margin required before leaving a charger.""" - base = max(20.0, 0.08 * float(self.battery_max)) - path_margin = min(18.0, 0.14 * float(max(self.nearest_charger_path_dist, 0.0))) - obstacle_margin = 12.0 * float(self.local_obstacle_ratio) - return float(np.clip(base + path_margin + obstacle_margin, 20.0, 64.0)) + base = max(12.0, 0.05 * float(self.battery_max)) + path_margin = min(12.0, 0.10 * float(max(self.nearest_charger_path_dist, 0.0))) + obstacle_margin = 8.0 * float(self.local_obstacle_ratio) + return float(np.clip(base + path_margin + obstacle_margin, 12.0, 42.0)) def _recharge_low_battery_ratio(self): """Adaptive low-battery ratio based on route length and local obstacle density.""" path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0) - ratio = 0.32 + min(0.09, 0.42 * path_pressure) + min(0.04, 0.14 * float(self.local_obstacle_ratio)) + ratio = 0.22 + min(0.10, 0.36 * path_pressure) + min(0.035, 0.12 * float(self.local_obstacle_ratio)) if self.has_charger and not self.charger_route_known: - ratio += 0.04 + ratio += 0.035 if self.recharge_no_progress_steps > 0 or self.fake_charger_steps > 0: ratio += 0.02 - return float(np.clip(ratio, 0.32, 0.46)) + return float(np.clip(ratio, 0.22, 0.38)) def _full_charge_leave_ratio(self): """Adaptive near-full threshold for leaving a charger.""" remaining_step_ratio = 1.0 - _norm(self.step_no, self.max_step) path_pressure = float(max(self.nearest_charger_path_dist, 0.0)) / max(float(self.battery_max), 1.0) - ratio = 0.88 + 0.04 * remaining_step_ratio + min(0.02, 0.08 * path_pressure) + ratio = 0.84 + 0.04 * remaining_step_ratio + min(0.02, 0.08 * path_pressure) ratio += min(0.01, 0.04 * float(self.local_obstacle_ratio)) - return float(np.clip(ratio, 0.88, 0.95)) + return float(np.clip(ratio, 0.84, 0.92)) def _recharge_risk_score(self): """Risk score in [0, 1] used to scale recharge rewards and penalties.""" @@ -823,8 +823,8 @@ class Preprocessor: prev_low_risk = max(0.0, self.recharge_low_battery_ratio - prev_battery_ratio) prev_low_risk /= max(self.recharge_low_battery_ratio, 1e-6) risk = max(self._recharge_risk_score(), prev_low_risk) - mode_bonus = 0.8 if self.was_recharge_mode or self.prev_low_battery else 0.0 - return float(np.clip(3.0 + 2.8 * risk + mode_bonus, 3.0, 6.5)) + mode_bonus = 0.25 if self.was_recharge_mode or self.prev_low_battery else 0.0 + return float(np.clip(0.60 + 0.65 * risk + mode_bonus, 0.60, 1.45)) def battery_fail_penalty(self): """Adaptive terminal penalty for running out of battery before max steps.""" @@ -1239,6 +1239,145 @@ class Preprocessor: score -= 1.0 return float(score) + def planned_eval_action(self, probs, legal_action): + """Return a planner action for evaluation when it clearly beats the policy. + + The planner is only used by exploit(). Training samples still come from + the stochastic PPO policy. + """ + probs = np.asarray(probs, dtype=np.float64) + legal = np.asarray(legal_action, dtype=np.float32) > 0.5 + if not np.any(legal): + legal = np.ones(8, dtype=bool) + + legal_indices = np.flatnonzero(legal) + if legal_indices.size == 0: + return None + + scored = [] + for action in legal_indices: + action = int(action) + score = self._planned_eval_score(action) + if score <= -1e5: + continue + scored.append((score, float(probs[action]), -action, action)) + + if not scored: + return None + + scored.sort(reverse=True) + best_score, _, _, planned_action = scored[0] + policy_action = int(legal_indices[np.argmax(probs[legal_indices])]) + if planned_action == policy_action: + return planned_action + + policy_score = self._planned_eval_score(policy_action) + policy_prob = float(probs[policy_action]) + planned_prob = float(probs[planned_action]) + force_safety = ( + self.recharge_mode + or self.low_battery + or self.npc_danger + or self.npc_predicted_danger + or self.stuck_steps >= 1 + ) + if force_safety: + return planned_action + + # Strongly prefer deterministic coverage when the learned policy is + # uncertain or the planner sees a much better cleaning/frontier move. + if policy_prob < 0.45 and best_score >= policy_score + 0.50: + return planned_action + if policy_prob - planned_prob <= 0.35 and best_score >= policy_score + 2.20: + return planned_action + return None + + def _planned_eval_score(self, action): + """Score one legal action for evaluation-time coverage planning.""" + if not (0 <= int(action) < len(self.ACTION_DIRS)): + return -1e6 + action = int(action) + dx, dz = self.ACTION_DIRS[action] + hx, hz = self.cur_pos + nx, nz = hx + dx, hz + dz + if not (0 <= nx < self.GRID_SIZE and 0 <= nz < self.GRID_SIZE): + return -1e6 + if not self._is_visible_cell_passable(dx, dz): + return -1e6 + if dx != 0 and dz != 0: + if not (self._is_visible_cell_passable(dx, 0) or self._is_visible_cell_passable(0, dz)): + return -1e6 + if self._is_npc_danger_cell(nx, nz, expanded=False): + return -1e6 + + score = self.evaluation_action_score(action) + cell = self._view_cell(dx, dz, default=1) + battery_ratio = self.battery / max(self.battery_max, 1) + visit_count = int(self.visit_count_map[nx, nz]) + + recharge_required = ( + self.has_charger + and ( + self.recharge_mode + or self.low_battery + or self.charger_safety_margin <= self.recharge_enter_margin + 4.0 + ) + ) + if recharge_required: + cur_dist = self._charger_move_distance(hx, hz) + next_dist = self._charger_move_distance(nx, nz) + dist_delta = float(np.clip(cur_dist - next_dist, -2.0, 2.0)) + score += 10.0 * dist_delta + if next_dist < cur_dist: + score += 3.0 + if self._is_charger_cell(nx, nz): + score += 5.0 + if cell == 2 and self.charger_safety_margin > self.recharge_enter_margin + 10.0: + score += 1.0 + return float(score) + + if cell == 2: + score += 10.0 + else: + score -= 0.15 + + current_local_dirt = self.nearest_dirt_dist + next_local_dirt = self._nearest_local_dirt_dist_from(dx, dz) + if current_local_dirt < 200.0 and next_local_dirt < 200.0: + score += 3.0 * float(np.clip(current_local_dirt - next_local_dirt, -2.0, 2.0)) + + if self.global_dirty_path_dist < self.GRID_SIZE: + score += 5.0 * float(self.global_dirty_action_delta[action]) + elif self.frontier_path_dist < self.GRID_SIZE: + score += 3.5 * float(self.frontier_action_delta[action]) + + score += 0.65 if visit_count == 0 else -0.16 * min(visit_count, 12) + if action == self.last_action and self.stuck_steps == 0: + score += 0.10 + if self.has_charger and self.charger_safety_margin <= self.recharge_enter_margin + 12.0: + score += 2.0 * float(self.charger_action_delta[action]) + if self._is_charger_cell(nx, nz) and battery_ratio > 0.55: + score -= 4.0 + if self._is_npc_danger_cell(nx, nz, expanded=True): + score -= 3.0 + return float(score) + + def _nearest_local_dirt_dist_from(self, dx, dz): + """Nearest visible dirt path distance after applying a candidate move.""" + cell = self._view_cell(dx, dz, default=0) + if cell == 0: + return 200.0 + if cell == 2: + return 0.0 + + dirt_coords = np.argwhere(self._view_map == 2) + if len(dirt_coords) == 0: + return 200.0 + + dist = self._local_bfs_distances(dx, dz) + best = min(float(dist[ri, ci]) for ri, ci in dirt_coords) + return best if best < self.INF_DIST else 200.0 + def _filter_blocked_actions(self, legal_action): """Filter actions that are visibly blocked in the 21x21 view.""" legal = [int(x) for x in legal_action] @@ -1498,25 +1637,22 @@ class Preprocessor: cleaned_this_step = max(0, self.dirt_cleaned - self.last_dirt_cleaned) cleaned_cells = self.step_cleaned_count if self.step_cleaned_count > 0 else cleaned_this_step battery_ratio = self.battery / max(self.battery_max, 1) - battery_pressure = self.has_charger and battery_ratio < self.recharge_low_battery_ratio + 0.06 - cleaning_scale = 0.2 if self.recharge_mode else (0.55 if battery_pressure else 0.7) - cleaning_scale *= cleaning_multiplier - cleaning_reward = cleaning_scale * cleaned_cells + cleaning_reward = cleaning_multiplier * float(cleaned_cells) # Step penalty / 时间惩罚 - step_penalty = -0.002 + step_penalty = -0.004 # Recharge guidance only activates when battery safety is the bottleneck. # 仅在低电量/回充模式下引导靠近充电桩,避免高电量蹲充电桩。 charge_reward = 0.0 prev_battery_ratio = self.prev_battery / max(self.prev_battery_max, 1) useful_charge = self.charge_delta > 0 and ( - self.prev_low_battery or self.was_recharge_mode or prev_battery_ratio < 0.45 + self.prev_low_battery or self.was_recharge_mode or prev_battery_ratio < 0.35 ) if useful_charge: charge_reward += self.useful_charge_reward_weight() - elif self.charge_delta > 0 and battery_ratio > 0.65: - charge_reward -= 0.25 * min(self.charge_delta, 3) + elif self.charge_delta > 0 and battery_ratio > 0.55: + charge_reward -= 0.45 * min(self.charge_delta, 3) if self.has_charger and (self.recharge_mode or self.low_battery): recharge_risk = self._recharge_risk_score() @@ -1527,8 +1663,8 @@ class Preprocessor: range_delta = float( np.clip(self.last_nearest_charger_range_dist - self.nearest_charger_range_dist, -2.0, 2.0) ) - discovery_scale = 0.035 + 0.035 * recharge_risk - range_scale = 0.015 + 0.015 * recharge_risk + discovery_scale = 0.020 + 0.030 * recharge_risk + range_scale = 0.010 + 0.018 * recharge_risk charge_reward += discovery_scale * frontier_progress if self.prev_pos is not None and self.cur_pos != self.prev_pos and self.stuck_steps == 0: charge_reward += range_scale * range_delta @@ -1536,14 +1672,14 @@ class Preprocessor: dist_delta = float( np.clip(self.last_nearest_charger_path_dist - self.nearest_charger_path_dist, -4.0, 4.0) ) - approach_scale = 0.07 + 0.06 * recharge_risk - retreat_scale = 0.035 + 0.045 * recharge_risk + approach_scale = 0.040 + 0.045 * recharge_risk + retreat_scale = 0.020 + 0.035 * recharge_risk charge_reward += approach_scale * dist_delta if dist_delta > 0 else retreat_scale * dist_delta if self.charger_safety_margin < self.recharge_enter_margin: safety_shortage = self.recharge_enter_margin - self.charger_safety_margin - charge_reward -= min(0.55, safety_shortage / max(self.battery_max, 1)) - elif self.on_charger and battery_ratio > 0.65: - charge_reward -= 0.08 + charge_reward -= min(0.35, safety_shortage / max(self.battery_max, 1)) + elif self.on_charger and battery_ratio > 0.55: + charge_reward -= 0.18 charge_reward *= charge_multiplier # Encourage covering new passable cells and mildly discourage loops. @@ -1551,22 +1687,22 @@ class Preprocessor: if self.recharge_mode: exploration_reward = 0.0 else: - exploration_reward = 0.004 if self.is_new_cell else -0.0015 * min(self.current_visit_count, 6) + exploration_reward = 0.020 if self.is_new_cell else -0.006 * min(self.current_visit_count, 8) if self.global_dirty_path_dist < self.GRID_SIZE: dirty_progress = np.clip(self.last_global_dirty_path_dist - self.global_dirty_path_dist, -3.0, 3.0) - exploration_reward += 0.008 * dirty_progress + exploration_reward += 0.020 * dirty_progress elif self.frontier_path_dist < self.GRID_SIZE: frontier_progress = np.clip(self.last_frontier_path_dist - self.frontier_path_dist, -3.0, 3.0) - exploration_reward += 0.005 * frontier_progress + exploration_reward += 0.014 * frontier_progress exploration_reward *= exploration_multiplier # Collision/stuck signal: invalid moves waste both step and battery. # 撞墙/原地不动会浪费步数和电量。 stuck_penalty = 0.0 if self.prev_pos is not None and self.cur_pos == self.prev_pos and 0 <= self.last_action < 8: - stuck_penalty = -0.03 + stuck_penalty = -0.08 if self.recharge_mode: - stuck_penalty -= 0.02 * min(self.stuck_steps, 5) + stuck_penalty -= 0.04 * min(self.stuck_steps, 5) npc_penalty = 0.0 if self.npc_danger: @@ -1588,9 +1724,9 @@ class Preprocessor: def _reward_profile_scales(self): """Return multipliers for quick reward-shaping ablations.""" if self.reward_profile == "lower_recharge": - return 1.0, 0.70, 1.0 + return 1.0, 0.45, 1.0 if self.reward_profile == "clean_explore": - return 1.15, 0.85, 1.50 + return 1.10, 0.60, 1.35 if self.reward_profile == "battery_safe": - return 0.95, 1.25, 0.90 + return 0.95, 0.85, 0.90 return 1.0, 1.0, 1.0