ImaghT's picture
Upload Unit_4.py with huggingface_hub
c19d0ac verified
# ============================================================
# Unit 4: Policy Gradient (REINFORCE) for CartPole-v1
# Deep Reinforcement Learning Course - Hugging Face
# ============================================================
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import gymnasium as gym
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pickle
import os
# ============================================================
# 设备配置
# ============================================================
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# ============================================================
# 环境配置
# ============================================================
ENV_ID = "CartPole-v1" # 统一使用CartPole-v1
env = gym.make(ENV_ID)
eval_env = gym.make(ENV_ID)
s_size = env.observation_space.shape[0] # 状态空间维度
a_size = env.action_space.n # 动作空间维度
print(f"Environment: {ENV_ID}")
print(f"Observation Space: {s_size}, Action Space: {a_size}")
# ============================================================
# 策略网络定义
# ============================================================
class Policy(nn.Module):
"""
策略网络:输入状态,输出动作概率分布
"""
def __init__(self, s_size, a_size, h_size=128):
"""
初始化策略网络
Args:
s_size: 状态空间维度
a_size: 动作空间维度
h_size: 隐藏层大小
"""
super(Policy, self).__init__()
self.fc1 = nn.Linear(s_size, h_size)
self.fc2 = nn.Linear(h_size, a_size)
# self.fc3 = nn.Linear(h_size, a_size)
def forward(self, x):
"""
前向传播
Args:
x: 输入状态
Returns:
动作概率分布
"""
x = F.relu(self.fc1(x))
# x = F.relu(self.fc2(x))
x = self.fc2(x)
return F.softmax(x, dim=1)
def act(self, state):
"""
根据当前策略选择动作
Args:
state: 当前状态
Returns:
action: 选择的动作
log_prob: 该动作的对数概率(用于梯度计算)
"""
# 转换状态为tensor并移到正确设备
state = torch.from_numpy(state).float().unsqueeze(0).to(device)
# 获取动作概率分布(保持在同一设备上)
probs = self.forward(state)
# 创建分类分布
m = torch.distributions.Categorical(probs)
# 采样动作(基于概率分布,不是贪心选择)
action = m.sample()
# 返回动作值和对数概率
return action.item(), m.log_prob(action)
# ============================================================
# REINFORCE算法实现
# ============================================================
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
"""
REINFORCE算法训练函数
Args:
policy: 策略网络
optimizer: 优化器
n_training_episodes: 训练轮数
max_t: 每轮最大步数
gamma: 折扣因子
print_every: 打印间隔
Returns:
scores: 每轮得分列表
"""
scores_deque = deque(maxlen=100) # 保存最近100轮得分
scores = [] # 保存所有得分
for i_episode in range(1, n_training_episodes + 1):
saved_log_probs = [] # 保存每步的log概率
rewards = [] # 保存每步的奖励
state, _ = env.reset()
# --- 1. 收集一条��整轨迹 ---
for t in range(max_t):
# 根据当前策略选择动作
action, log_prob = policy.act(state)
saved_log_probs.append(log_prob)
# 执行动作,获取下一状态和奖励
state, reward, terminated, truncated, _ = env.step(action)
rewards.append(reward)
# 检查是否结束
if terminated or truncated:
break
# 记录本轮总得分
scores_deque.append(sum(rewards))
scores.append(sum(rewards))
# --- 2. 计算折扣回报 (Discounted Returns) ---
returns = deque(maxlen=max_t)
n_steps = len(rewards)
# 从后往前计算累计折扣回报:G_t = r_t + γ*r_{t+1} + γ²*r_{t+2} + ...
G = 0
for r in reversed(rewards):
G = r + gamma * G
returns.appendleft(G)
# 标准化回报(重要的工程技巧,提高训练稳定性)
returns = torch.tensor(returns).to(device)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
# --- 3. 计算策略梯度损失 ---
# 策略梯度定理:∇J(θ) = E[∇log π(a|s) * G_t]
# 损失函数:L = -∑(log_prob * return) (负号因为要最大化回报)
policy_loss = []
for log_prob, return_val in zip(saved_log_probs, returns):
policy_loss.append(-log_prob * return_val)
# 合并所有损失
policy_loss = torch.cat(policy_loss).sum()
# --- 4. 反向传播更新参数 ---
optimizer.zero_grad()
policy_loss.backward()
optimizer.step()
# 打印训练进度
if i_episode % print_every == 0:
print(f'Episode {i_episode}\tAverage Score: {np.mean(scores_deque):.2f}')
return scores
# ============================================================
# 评估函数
# ============================================================
def evaluate_policy(policy, eval_env, n_eval_episodes=10):
"""
评估策略性能
Args:
policy: 训练好的策略网络
eval_env: 评估环境
n_eval_episodes: 评估轮数
Returns:
episode_rewards: 每轮奖励列表
mean_reward: 平均奖励
std_reward: 奖励标准差
"""
episode_rewards = []
for i in range(n_eval_episodes):
state, _ = eval_env.reset()
episode_reward = 0
done = False
while not done:
# 评估时使用确定性策略(不采样,选择概率最大的动作)
with torch.no_grad():
state_tensor = torch.from_numpy(state).float().unsqueeze(0).to(device)
probs = policy.forward(state_tensor)
action = torch.argmax(probs, dim=1).item()
state, reward, terminated, truncated, _ = eval_env.step(action)
episode_reward += reward
done = terminated or truncated
episode_rewards.append(episode_reward)
print(f"Eval Episode {i+1}: Reward = {episode_reward}")
mean_reward = np.mean(episode_rewards)
std_reward = np.std(episode_rewards)
print(f"\nEvaluation Results:")
print(f"Mean Reward: {mean_reward:.2f}")
print(f"Std Reward: {std_reward:.2f}")
print(f"Score (mean - std): {mean_reward - std_reward:.2f}")
return episode_rewards, mean_reward, std_reward
# ============================================================
# 主训练流程
# ============================================================
if __name__ == "__main__":
# 超参数设置
HIDDEN_SIZE = 128
LEARNING_RATE = 3e-3
N_TRAINING_EPISODES = 800
MAX_T = 1000
GAMMA = 0.99
PRINT_EVERY = 100
print("="*60)
print("Starting REINFORCE Training")
print("="*60)
# 初始化策略网络和优化器
policy = Policy(s_size, a_size, HIDDEN_SIZE).to(device)
optimizer = optim.Adam(policy.parameters(), lr=LEARNING_RATE)
print(f"Policy Network: {policy}")
print(f"Optimizer: Adam (lr={LEARNING_RATE})")
print(f"Training Episodes: {N_TRAINING_EPISODES}")
print(f"Max Steps per Episode: {MAX_T}")
print(f"Discount Factor: {GAMMA}")
print()
# 开始训练
scores = reinforce(policy, optimizer, N_TRAINING_EPISODES, MAX_T, GAMMA, PRINT_EVERY)
print("\n" + "="*60)
print("Training Completed!")
print("="*60)
# 保存模型
MODEL_PATH = "reinforce_cartpole.pth"
torch.save({
'policy_state_dict': policy.state_dict(),
'optimizer_state_dict': optimizer.state_dict(),
's_size': s_size,
'a_size': a_size,
'hidden_size': HIDDEN_SIZE,
'scores': scores
}, MODEL_PATH)
print(f"✅ Model saved to {MODEL_PATH}")
# 评估训练好的模型
print("\n" + "="*60)
print("Evaluating Trained Policy")
print("="*60)
episode_rewards, mean_reward, std_reward = evaluate_policy(policy, eval_env, n_eval_episodes=10)
# 绘制学习曲线
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.plot(scores)
plt.title('Training Scores')
plt.xlabel('Episode')
plt.ylabel('Score')
plt.grid(True)
plt.subplot(1, 2, 2)
# 计算移动平均
window_size = 100
if len(scores) >= window_size:
moving_avg = [np.mean(scores[i:i+window_size]) for i in range(len(scores)-window_size+1)]
plt.plot(range(window_size-1, len(scores)), moving_avg)
plt.title(f'Moving Average (window={window_size})')
plt.xlabel('Episode')
plt.ylabel('Average Score')
plt.grid(True)
plt.tight_layout()
plt.savefig('training_results.png', dpi=150, bbox_inches='tight')
plt.show()
print(f"\n🎉 Final Results:")
print(f" Mean Reward: {mean_reward:.2f}")
print(f" Std Reward: {std_reward:.2f}")
print(f" Score: {mean_reward - std_reward:.2f}")
print(f" Required for CartPole-v1: 350.0")
if mean_reward - std_reward >= 350:
print(f" Status: ✅ PASSED!")
else:
print(f" Status: ❌ Need {350 - (mean_reward - std_reward):.2f} more points")