You need to enable JavaScript to run this app.
最新活动
大模型
产品
解决方案
定价
生态与合作
支持与服务
开发者
了解我们

Keras实现DQN训练CartPole-v1无性能提升问题求助

解决CartPole-v1 DQN训练无提升的问题

你好!我看了你的代码和问题描述,你遇到的情况是新手用DQN训练CartPole这类环境时非常典型的问题——不是代码有语法bug,而是DQN的核心训练机制没做到位,导致模型学不到稳定的策略。下面我帮你拆解问题并给出修改方案:

核心问题分析

你的当前实现是最基础的Q-learning在线更新版本,没有用到DQN的两个关键稳定机制,这对于CartPole这种需要稳定学习信号的环境来说是致命的:

  1. 缺少经验回放(Experience Replay)
    你每一步都用当前的样本直接更新模型,这样样本之间是高度相关的(连续的游戏帧状态是关联的),会导致模型训练震荡甚至无法收敛。经验回放通过存储历史样本并随机采样训练,能打破这种相关性,让学习更稳定。

  2. 缺少目标网络(Target Network)
    你用同一个模型来计算当前Q值和目标Q值(target = reward + gamma*np.max(model.predict(new_observation))),这会导致目标值和预测值相互纠缠,训练过程不稳定。目标网络是一个固定一段时间的副本,用来计算稳定的目标Q值,能大幅提升收敛性。

另外还有两个小问题:

  • epsilon衰减策略不合理:初始0.5的探索率不算高,但每轮衰减0.999的速度太慢,500轮后还保持在0.3左右,模型没法充分利用已学到的策略。
  • 动作选择的小瑕疵np.argmax(model.predict(observation))里,model.predict返回的是二维数组(因为你的observation是(-1,4)形状),应该取第一个元素再argmax,不过这个不是主要问题。

修改后的完整代码

下面是加入了经验回放和目标网络的完整实现,同时调整了epsilon策略和训练逻辑:

import gym
import numpy as np
from keras.models import Sequential
from keras.layers import Dense
from keras.optimizers import Adam

# 准备环境
env = gym.make('CartPole-v1')

# 创建主模型和目标模型
def build_model(state_size, action_size):
    model = Sequential()
    model.add(Dense(128, input_shape=(state_size,), activation='relu'))
    model.add(Dense(64, activation='relu'))
    model.add(Dense(action_size, activation='linear'))
    model.compile(loss='mse', optimizer=Adam(learning_rate=0.001))
    return model

state_size = env.observation_space.shape[0]
action_size = env.action_space.n
model = build_model(state_size, action_size)
target_model = build_model(state_size, action_size)
target_model.set_weights(model.get_weights())  # 初始化目标网络权重

# 经验回放缓冲区
class ReplayBuffer:
    def __init__(self, capacity):
        self.capacity = capacity
        self.buffer = []
        self.position = 0
    
    def push(self, state, action, reward, next_state, done):
        if len(self.buffer) < self.capacity:
            self.buffer.append(None)
        self.buffer[self.position] = (state, action, reward, next_state, done)
        self.position = (self.position + 1) % self.capacity
    
    def sample(self, batch_size):
        batch = np.random.choice(len(self.buffer), batch_size, replace=False)
        states, actions, rewards, next_states, dones = zip(*[self.buffer[i] for i in batch])
        return np.array(states), np.array(actions), np.array(rewards), np.array(next_states), np.array(dones)
    
    def __len__(self):
        return len(self.buffer)

# 训练函数
def train_model(n_episodes=500, batch_size=32, gamma=0.95, epsilon_start=0.9, epsilon_end=0.01, epsilon_decay=0.995, target_update=10):
    replay_buffer = ReplayBuffer(10000)
    G_array = []
    
    for episode in range(n_episodes):
        state = env.reset()
        state = state.reshape(1, state_size)
        epsilon = max(epsilon_end, epsilon_start * (epsilon_decay ** episode))
        G = 0
        done = False
        
        while not done:
            # epsilon-greedy动作选择
            if np.random.random() < epsilon:
                action = env.action_space.sample()
            else:
                q_values = model.predict(state, verbose=0)
                action = np.argmax(q_values[0])
            
            next_state, reward, done, info = env.step(action)
            next_state = next_state.reshape(1, state_size)
            
            # 可选:优化奖励信号,比如当杆角度过大时给负奖励
            pole_angle = abs(next_state[0][2])
            if pole_angle > 0.2:  # 约11.5度
                reward -= 0.1
            
            # 存入经验回放
            replay_buffer.push(state[0], action, reward, next_state[0], done)
            
            state = next_state
            G += reward
            
            # 当缓冲区足够大时开始批量训练
            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
                
                # 用目标网络计算目标Q值
                target_q = model.predict(states, verbose=0)
                next_q = target_model.predict(next_states, verbose=0)
                for i in range(batch_size):
                    if dones[i]:
                        target_q[i][actions[i]] = rewards[i]
                    else:
                        target_q[i][actions[i]] = rewards[i] + gamma * np.max(next_q[i])
                
                # 批量更新主模型
                model.fit(states, target_q, epochs=1, verbose=0)
        
        # 每隔target_update轮更新目标网络权重
        if episode % target_update == 0:
            target_model.set_weights(model.get_weights())
        
        G_array.append(G)
        if (episode + 1) % 10 == 0:
            print(f"Episode {episode+1}, Total Reward: {G}, Epsilon: {epsilon:.3f}")
    
    return G_array

G_array = train_model()
print("Final reward array:", G_array)

关键改动说明

  1. 经验回放缓冲区:用一个队列存储历史交互样本,每次随机采样32个样本批量训练,打破样本相关性。
  2. 目标网络:每10轮将主模型的权重复制给目标网络,用它来计算稳定的目标Q值,避免训练震荡。
  3. 优化的epsilon策略:初始探索率0.9,每轮衰减到0.995,直到降到0.01,平衡探索和利用。
  4. 可选的奖励优化:给杆角度过大的情况加小惩罚,让模型更清晰地感知错误动作。
  5. 批量训练:不再每步更新模型,而是积累足够样本后批量更新,训练效率和稳定性都更高。

运行这段代码后,你应该能看到G_array里的奖励值逐渐上升,最终稳定在500(CartPole-v1的最大奖励,代表游戏通关)。

内容的提问来源于stack exchange,提问作者toenails_sauce

火山引擎 最新活动