本系列是学习《动手学强化学习》过程中做的摘抄。
5.1 model-based 和 model-free#
根据是否具有环境模型(对环境的状态转移概率和奖励函数进行建模),强化学习算法分为两种:基于模型的强化学习(model-based reinforcement learning)和无模型的强化学习(model-free reinforcement learning)。
- model-based RL 根据 Agent 与环境交互采样到的数据直接进行策略提升或者价值估计(如 Sarsa 和 Q-learning);
- model-free RL 中,模型是可以事先知道的,也可以是根据 Agent 与环境交互采样到的数据学习得到的,然后用这个模型进行策略提升或者价值估计(如动态规划算法中的策略迭代和价值迭代、Dyna-Q)。
5.2 Dyna-Q 算法#
Dyna-Q 使用一种叫做 Q-planning 的方法来基于模型生成一些模拟数据,然后用模拟数据和真实数据一起改进策略。
Q-planning 每次选取一个曾经访问过的状态 \(s\),采取一个曾经在该状态下执行过的动作 \(a\),通过模型得到转以后的状态 \(s'\) 以及奖励 \(r\),并根据这个模拟数据 \((s,a,r,s')\) 用 Q-learning 的更新方式来更新动作价值函数。
Q-learning 的时序差分更新方式为
$$Q(s_t,a_t) \leftarrow Q(s_t,a_t) + \alpha \left[ r_t + \gamma \max_{a} Q(s_{t+1},a) - Q(s_t,a_t) \right]$$
在每次与环境交互执行一次 Q-learning 之后,Dyna-Q 会执行 \(N\) 次 Q-planning,其中 \(N\) 是一个可以事先选择的超参数,当其为 0 时就是普通的 Q-learning。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
import numpy as np
class DynaQ:
def __init__(self, ncol, nrow, epsilon, alpha, gamma, n_planning, n_action=4):
"""Dyna-Q 算法"""
self.Q_table = np.zeros((ncol * nrow, n_action)) # 初始化动作价值 Q(s, a) 表格
self.n_action = n_action # 动作个数
self.epsilon = epsilon
self.alpha = alpha # 学习率
self.gamma = gamma # 折扣因子
self.n_planning = n_planning # 执行 1 次 Q-learning 后执行 Q-planning 的次数
self.model = dict() # 环境模型
def take_action(self, state):
"""选取下一步的操作"""
if np.random.random() < self.epsilon:
action = np.random.randint(self.n_action)
else:
action = np.argmax(self.Q_table[state])
return action
def q_learning(self, state, action, reward, next_state):
"""Q-learning"""
td_error = reward + self.gamma * self.Q_table[next_state].max() - self.Q_table[state, action]
self.Q_table[state, action] += self.alpha * td_error
def update(self, state, action, reward, next_state):
self.q_learning(state, action, reward, next_state)
self.model[(state, action)] = reward, next_state # 把数据添加到模型中
# Q-planning 循环
for _ in range(self.n_planning):
# 随机选择曾经遇到过的状态动作对
(s, a), (r, s_) = random.choice(list(self.model.items()))
self.q_learning(s, a, r, s_)
5.3 DQN 算法#
之前的 Q-learning 算法以一张表格来存储 \(Q\) 值,但这种做法只在环境的状态和动作都是离散的,并且空间都比较小的情况下适用。当状态或者动作数量非常大,更甚者,当状态或者动作连续的时候,需要用函数拟合的方法来估计 \(Q\) 值:将这个复杂的 \(Q\) 值表格视作数据,使用一个参数化的函数 \(Q_{\theta}\) 来拟合这些数据。
由于神经网络具有强大的表达能力,因此可以用一个神经网络来表示函数 \(Q\),称为 Q 网络。假设神经网络用来拟合 \(Q\) 的参数是 \(\omega\),即每一个状态 \(s\) 下所有可能动作 \(a\) 的 \(Q\) 值都能表示为 \(Q_{\omega}(s,a)\)。
对于一组数据 \((s_i, a_i, r_i, s_i')\),将 Q 网络的损失函数构造为均方误差的形式:
$$ \omega^{*} = \arg \min_{\omega} \frac{1}{2N} \sum_{i=1}^{N} \left[ Q_{\omega}(s_i,a_i)-(r_i + \gamma \max_{a' \in \mathcal{A}} Q_{\omega}(s_i', a')) \right]^2 $$至此,就可以将 Q-learning 算法扩展到神经网络形式——深度 \(Q\) 网络(deep Q network, DQN)。
经验回放#
DQN 是离线策略算法。为了更好地将 Q-learning 和深度神经网络结合,DQN 算法采用了经验回放方法,具体做法为维护一个回放缓冲区,将每次从环境中采样得到的 \((s, a, r, s')\) 存储到回放缓冲区中,训练 \(Q\) 网络时再从回放缓冲区中随机采样若干数据来进行训练。
目标网络#
DQN 算法最终更新的目标是让 \(Q_{\omega}(s,a)\) 逼近 \(r+\gamma \max_{a'}Q_{\omega}(s',a')\),由于 TD 误差目标本身就包含神经网络的输出,因此在更新网络参数的同时目标也在不断地改变,这非常容易造成神经网络训练的不稳定性。为了解决这一问题,DQN 使用了目标网络的思想:
- 原来的训练网络 \(Q_{\omega}(s,a)\),用于计算原来的损失函数 \(\frac{1}{2} \left[ Q_{\omega}(s,a)-(r + \gamma \max_{a' \in \mathcal{A}} Q_{\omega^{-}}(s', a')) \right]^2\) 中的 \(Q_{\omega}(s,a)\) 项,并使用正常梯度下降方法来进行更新。
- 目标网络 \(Q_{\omega^{-}}\),用于计算原来的损失函数 \(\frac{1}{2} \left[ Q_{\omega}(s,a)-(r + \gamma \max_{a' \in \mathcal{A}} Q_{\omega^{-}}(s', a')) \right]^2\) 中的 \(r + \gamma \max_{a' \in \mathcal{A}} Q_{\omega^{-}} (s',a')\) 项,其中 \(\omega^{-}\) 为目标网络中的参数。
为了让更新目标更稳定,目标网络使用训练网络的一套比较旧的参数,训练网络 \(Q_{\omega}(s,a)\) 在训练中的每一步都会更新,而目标网络的参数每隔 \(C\) 步才会与训练网络同步一次,即 \(\omega^{-} \leftarrow \omega\)。
5.4 Double DQN 算法#
普通的 DQN 算法通常会导致对 \(Q\) 值的过高估计:考虑到通过神经网络估算的 \(Q\) 值本身在某些时候会产生正向或负向的误差,在 DQN 的更新方式下神经网络会将正向误差累积。
为了解决这一问题,Double DQN 算法提出利用两套独立训练的神经网络来估算 \(\max_{a'} Q^{*}(s', a')\),具体做法是将原来的 \(\max_{a'} Q_{\omega^{-}} (s',a')\) 更改为 \(Q_{\omega^{-}}(s', \arg \max_{a'} Q_{\omega}(s',a'))\)。
DQN 与 Double DQN 的差别只在于计算状态 \(s'\) 下的 Q 值时如何选取动作:
- DQN 的优化目标可以写为 \(r + \gamma Q_{\omega^{-}} (s', \arg \max_{a'} Q_{\omega^{-}} (s',a'))\),动作的选取依靠目标网络 \(Q_{\omega^{-}}\);
- Double DQN 的优化目标为 \(r + \gamma Q_{\omega^{-}} (s', \arg \max_{a'} Q_{\omega} (s',a'))\),动作的选取依靠训练网络 \(Q_{\omega}\)。
5.5 Dueling DQN 算法#
Dueling DQN 是 DQN 的另一种改进算法。在强化学习中,将动作价值函数 \(Q\) 减去状态价值函数 \(V\) 的结果定义为优势函数 \(A\),即 \(A(s,a) = Q(s,a) - V(s)\)。在同一个状态下,所有动作的优势值之和为 0,因为所有动作的动作价值的期望就是这个状态的状态价值。据此,在 Dueling DQN 中,\(Q\) 网络被建模为:
$$ Q_{\eta, \alpha, \beta} (s,a) = V_{\eta, \alpha}(s) + A_{\eta, \beta}(s,a) $$其中,\(V_{\eta, \alpha}(s)\) 为状态价值函数,而 \(A_{\eta, \beta}(s,a)\) 则为该状态下采取不同动作的优势函数,表示不同动作的差异性:\(\eta\) 是 \(V\) 和 \(A\) 共享的网络参数,一般用在神经网络中,用来提取特征的前几层;而 \(\alpha\) 和 \(\beta\) 分别为 \(V\) 和 \(A\) 的参数。
在这样的模型下,不再让神经网络直接输出 \(Q\) 值,而是训练神经网络的最后几层的两个分支,分别输出状态价值函数和优势函数,再求和得到 \(Q\) 值。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import random
from collections import deque
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
class ReplayBuffer:
def __init__(self, capacity):
"""经验回放池"""
self.buffer = deque(maxlen=capacity)
def add(self, state, action, reward, next_state, done):
"""加入数据"""
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
"""采样数据"""
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
def size(self):
"""返回当前回放池的大小"""
return len(self.buffer)
class Qnet(nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
"""只有一层隐藏层的 Q 网络"""
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, action_dim)
def forward(self, x) -> torch.Tensor:
x = F.relu(self.fc1(x))
return self.fc2(x)
class VAnet(nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
"""只有一层隐藏层的优势函数 A 网络和状态价值函数 V 网络"""
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim) # 共享网络部分
self.fc_A = nn.Linear(hidden_dim, action_dim)
self.fc_V = nn.Linear(hidden_dim, 1)
def forward(self, x: torch.Tensor):
A = self.fc_A(F.relu(self.fc1(x)))
V = self.fc_V(F.relu(self.fc1(x)))
Q = V + A - A.mean(1).view(-1, 1) # Q 值由 V 值和 A 值计算得到
return Q
class DQN:
def __init__(self, state_dim, hidden_dim, action_dim, learning_rate, gamma, epsilon, target_update, device, dqn_type="VanillaDQN"):
"""DQN 算法,包括 Double DQN 和 Dueling DQN"""
self.action_dim = action_dim
if dqn_type == "DuelingDQN": # Dueling DQN 采取不一样的网络框架
self.q_net = VAnet(state_dim, hidden_dim, self.action_dim).to(device)
self.target_q_net = VAnet(state_dim, hidden_dim, self.action_dim).to(device)
else:
self.q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device)
self.target_q_net = Qnet(state_dim, hidden_dim, self.action_dim).to(device)
self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=learning_rate)
self.gamma = gamma
self.epsilon = epsilon
self.target_update = target_update
self.count = 0
self.dqn_type = dqn_type
self.device = device
def take_action(self, state):
if np.random.random() < self.epsilon:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor([state], dtype=torch.float).to(self.device)
action = self.q_net(state).argmax().item()
return action
def max_q_value(self, state):
state = torch.tensor([state], dtype=torch.float).to(self.device)
return self.q_net(state).max().item()
def update(self, transition_dict):
states = torch.tensor(transition_dict["states"], dtype=torch.float).to(self.device)
actions = torch.tensor(transition_dict["actions"]).view(-1, 1).to(self.device)
rewards = torch.tensor(transition_dict["rewards"], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition_dict["next_states"], dtype=torch.float).to(self.device)
dones = torch.tensor(transition_dict["dones"], dtype=torch.float).view(-1, 1).to(self.device)
q_values = self.q_net(states).gather(1, actions)
if self.dqn_type == "DoubleDQN":
max_action = self.q_net(next_states).max(1)[1].view(-1, 1)
max_next_q_values = self.target_q_net(next_states).gather(1, max_action)
else:
max_next_q_values = self.target_q_net(next_states).max(1)[0].view(-1, 1)
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones)
dqn_loss = torch.mean(F.mse_loss(q_values, q_targets))
self.optimizer.zero_grad()
dqn_loss.backward()
self.optimizer.step()
if self.count % self.target_update == 0:
self.target_q_net.load_state_dict(self.q_net.state_dict())
self.count += 1