强化学习(五):DQN与实现

Q函数近似

前面所讲的基本算法均只针对离散状态的马尔科夫决策过程:状态集$S$通过集合形式表示。离散状态MDP下的动作值函数一般保存为表格:行对应状态,列对应动作,这种表示方法无法应用在连续状态MDP的情况下,因此在连续状态空间下,需要通过函数近似的方式来表示动作值函数$Q(S,A)$。

DQN

算法

上一小节说到可以通过函数来近似动作值函数,但是我们不知道最优的动作值函数的形式,因此近似函数形式的选择比较麻烦,特别在状态是高维度时,寻找合适的近似函数更加困难。因为深度神经网络理论上能够良好的近似任意形式的函数,而且自动抽取高维特征,因此考虑直接通过神经网络来近似最优的动作值函数,我们称这个网络为Q-network,假设它的参数为$w$,我们希望[1]:

有两种Q-network结构能够用来近似最优动作值函数[1],如下图所示:

第一种网络的输入包括状态和动作,输出对应的值函数估计;第二种网络仅仅输入状态,输出所有可能动作对应的值函数估计值。显然应该选择第二种结构,理由有两个:我们希望Q-网络能够抽取高维状态中的特征,没必要加入动作;另外第一种结构中,为了得到状态$s$下所有动作的值函数估计值,需要针对每个动作进行前向计算,而在第二种结构下,只需要一次前向计算便可以得到所有的动作值估计值,因此更加高效。

用Q-network近似最优值函数$Q^{\star}(s, a)$时,需要知道$Q^{\star}(s, a)$的形式,但是我们不知道它的具体形式(否则也没有必要用Q-网络近似表示),但我们知道它必须满足Belleman最优方程:

根据原始Q-learning算法的思路,我们用$ r + \gamma \; max_{a'} Q^{\star}(s', a') $来近似最优动作值函数,然后通过与环境的在线(on-line)交互进行增量式学习。将$ r + \gamma \; max_{a'} Q^{\star}(s', a')$ 中的$Q^{\star}(s', a')$替换为Q-网络$Q^{\star}(s', a', w)$,通过平方误差来衡量当前Q-网络$Q^{\star}(s, a)$的误差:

当我们尝试用梯度类算法来优化上述损失函数时,会面临如下两个问题,导致优化平方差损失函数时很难收敛:
第一,神经网络理论上要求输入的状态样本是独立同分布的,而在增量式学习过程中,环境的状态转移概率具有马尔可夫性,因此得到的状态样本不满足独立性。
第二个问题称为non-stationary target[1],在原始论文[2]中作者对该问题的描述为:

“ … small updates to Q may significantly change the policy and therefore change the data distribution, and the correlations between the action-values (Q) and the target values $\; r + \gamma \; max_{a’} Q^{\star}(s’, a’, w)$ …”

离散MDP下的$Q(s, a)$是一个表格,当我们修改$Q(s, a)$时不会影响$Q(s’, a’)$的值,而在这里,Q-network用于近似整体状态空间下的动作值函数,当我们修改$Q(s, a, w)$的网络参数时,影响的是整体的值函数,从而影响$Q(s’, a’)$,也就是说我们在调整网络参数尝试靠近目标时,(网络参数的调整)同时也改变了我们希望靠近的目标$\; r + \gamma \; max_{a’} Q^{\star}(s’, a’, w)$,这与有监督学习的学习过程完全不同。

论文[2]针对第一个问题的解决方法称为:Experience Replay。为了打破状态之间的相关性,在内存中保存一定量的Transition $(s, a, r, s’)$,然后从里面随机采集一个样本batch用于优化损失函数,如下图所示:

针对第二个问题,问题根源在于参数更新发生在同一个Q-网络中,因此作者[2]引入另一个网络Target network,这个网络的作用是周期性的记录Q-network的参数,以保证Q-network上的参数更新时Target network上对应的$Q^{\star}(s’, a’, w)$不变,从而减小这两个网络的关联性。

于是优化的目标函数变为:

其中$w$表示发生参数更新的Q-network的权重参数,$w^-$表示Target-network 上次保存的Q-network的权重参数。算法整体的流程如下:

简单DQN实现

步骤

从上面的算法流程来看,要实现DQN的步骤如下所示:

  1. 搭建两个网络结构完全相同的网络:目标网络Target_Network和计算网络Evaluate_Network;
  2. Evaluate_Network根据当前的状态S选择动作A;
  3. 动作A作用于环境后得到Transition(S, A, R, S’),将该Transition存入Transition池中;
  4. 从Transition池中随机选择Batch_size个Transition样本,记为Batch_Sample,将Batch_Sample送入网络Target_Network和Evaluate_Network中,分别得到两组输出结果q_next和q_eval;
  5. 根据结果q_next计算Evaluate_Network需要逼近的目标值:
  6. 将结果q_target送入网络Evaluate_Network进行参数更新;
  7. 重复步骤2) — 6),每隔一定迭代次数将Evaluate_Network的网络参数复制给Target_Network.

实现

按照DQN算法步骤,整体算法可以用下面的图来说明,建议配合我绘制的这张图来分析对应部分的代码:

搭建网络

Evaluate_Network和Target_Network的结构需要保持完全一致,不同的地方在于前者需要定义损失函数和相应的优化器,而后者只是作为前者网络参数的暂存器,不需要更改网络参数。因为DQN需要周期性的将Evaluate_Network的各层网络参数复制给Target_Network,因此需要在两个网络中建立两个独立的collection,便于从中提取网络参数Tensor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class DeepQNetwork:
def build_net(self):
# Q学习需要用到的Transition为(S, A, R, S_)
# self.S是Evaluate_Network用来接收输入状态
# 参数shape=[None,self.n_features]表示输入状态的特征数为self.n_features,状态的个数为None,即未指定,因此可以是任意多个状态
self.S = tf.placeholder(tf.float32, [None, self.n_features], name='S')
# 计算损失值时需要用到,计算公式在步骤5)中
self.q_target = tf.placeholder(tf.float32, [None, self.n_actions], name='Q_Target')
# 1、搭建Evaluate_Network
with tf.variable_scope('Evaluate_Network'):
# 建立一个collection,用于网络参数的提取
# 某一层的参数需要提取时,对应的collection设置为c_names
c_names = ['eval_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
num_layer1 = 10 # 第一个隐藏层的神经元个数
# 用于初始化网络权重
w_initializer = tf.random_normal_initializer(0., 0.5)
# 用于初始化网络偏置
b_initializer = tf.constant_initializer(0.5)
# 整个网络的参数对应上图左边网络的参数w
# 搭建Evaluate_Network的第一个隐藏层
with tf.variable_scope('Layer_1'):
w1 = tf.get_variable('w1', [self.n_features, num_layer1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [1, num_layer1], initializer=b_initializer, collections=c_names)
out_layer1 = tf.nn.relu(tf.matmul(self.S, w1) + b1)
# 搭建Evaluate_Network的输出层
with tf.variable_scope('Layer_2'):
w2 = tf.get_variable('w2', [num_layer1, self.n_actions], initializer=w_initializer, collection=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer=b_initializer, collection=c_names)
self.q_eval = tf.matmul(out_layer1, w2)+b2
# 根据期待输出q_target和当前输出q_eval计算MSE损失
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval))
# 选择RMSProp优化算法进行优化更新
with tf.variable_scope('train'):
self.train_evalute_network = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
# Target_Network需要的输入状态
self.S_ = tf.placeholder(tf.float32, [None, self.n_features], name='S_')
# 然后搭建与Evaluate_Network网络结构一样的Target_Network
with tf.variable_scope('Target_Network'):
# 定义一个与Evaluate_Network不同的collection
# 用于周期性的提取Target_Network各层网络的参数
c_names = ['target_net_params', tf.GraphKeys.GLOBAL_VARIABLES]
# 相同的网络结构,采用相同的权重初始化方式
# 由于上层variable_scope的不同,因此是独立的网络
# 整个网络的参数对应上图右边网络的参数w^-
with tf.variable_scope('Layer_1'):
w1 = tf.get_variable('w1', [self.n_features, num_layer1], initializer=w_initializer, collections=c_names)
b1 = tf.get_variable('b1', [num_layer1, self.n_actions], initializer=b_initializer, collections = c_names)
out_layer1 = tf.nn.relu(tf.matmul(self.S_, w1) + b1)
# Target_Network的输出层
# Target_Network作为Evaluate_Network参数的暂存网络
# 不需要进行参数优化,因此不需要定义损失函数和优化器
with tf.variable_scope('Layer_2'):
w2 = tf.get_variable('w2', [num_layer1, self.n_actions], initializer=w_initializer, collection=c_names)
b2 = tf.get_variable('b2', [1, self.n_actions], initializer = b_initializer, collection=c_names)
self.q_next = tf.matmul(out_layer1, w2) + b2

选择动作

这里采用的行为策略依旧是$\epsilon$-贪婪策略,因此与前一篇博文中Q-学习算法选择动作的思路一致,不同的地方在于,这里的Q函数是Evaluate_Network,因此需要在Session中进行前向传播,得到当前状态下的输出动作。需要注意的是需要将状态observation的shape转换成[1, self.n_features]的维度,方便输入到网络中(参见搭建网络时对self.S的说明)。

1
2
3
4
5
6
7
8
9
def choose_action(self, observation):
observation = observation[np.newaxis,:]
if np.random.uniform() < self.epsilon:
action_value = self.sess.run(self.q_eval, feed_dict={self.S : observation})
action = np.argmax(action_value)
else:
action = np.random.randint(0, self.n_actions)
return action

Transition Experience存储

存储Transition时,通过一个array存储,每一行对应一条(S,A,R,S_)记录。列的存储顺序上,该array的前后 self.n_features 列存储状态S和S’,第self.n_features+1列的位置存储动作A,紧挨着后面一列存储回报值R。行的存储顺序上,在给定最大行self.memory_size的array中从前往后循环存储,如果array存满,则重新从头开始覆盖掉旧的Transition进行存储。

1
2
3
4
5
6
7
8
9
10
def store_transition(self, S, A, R, S_):
if not hasattr(self, 'memory_counter'):
self.memory_counter = 0
# 按行的方向,将Transition拉成一维的向量,方便存储
transition = np.hstack((S, [A, R], S_))
index = self.memory_counter%self.memory_size
self.memory[index,:] = transition
self.memory_counter += 1

网络学习

这里包含了步骤4)—6),建议结合上面的示意图进行理解,特别是q_target部分的计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def learn(self):
# 周期性更新Target_Network的参数
if self.learn_step_counter % self.replace_target_iter == 0:
self.replace_target_params()
print('\n target_params_replaced \n')
# 从Transition Experience中选择一个batch_size的样本
if self.memory_counter > self.memory_size:
sample_index = np.random.choice(self.memory_size, size=self.batch_size)
else:
sample_index = np.random.choice(self.memory_counter, size=self.batch_size)
batch_sample = self.memory[sample_index,:]
# 将样本中S部分输入到Evaluate_Network中,而
# 将样本中S_部分输入到Target_Network中,模仿1-step TD过程
q_next, q_eval = self.sess.run(
[self.q_next, self.q_eval],
feed_dict={
self.S_ : batch_sample[:, -self.n_features:]
self.S : batch_sample[:, :self.n_features]
}
)
# 建议结合上面的算法整体示意图来理解下面五行代码的操作过程
q_target = q_eval.copy()
batch_index = np.arange(self.batch_size, dtype=np.int32)
eval_act_index = batch_memory[:, self.n_features].astype(int)
reward = batch_memory[:, self.n_features+1]
# 结合离散MDP中Q-学习算法的Q函数更新方式来理解:我们只需要更新(S,A)对应的Q函数,eval_act_index正是在定位对应状态S下那个需要更新的A
q_target[batch_index, eval_act_index] = reward + self.gamma * np.max(q_next, axis=1)
# 训练Evaluate_Network并记录相应的损失值
_, self.cost = self.sess.run(
[self.train_evalute_network, self.loss],
feed_dict = {
self.S : batch_memory[:, :self.n_features],
self.q_target: q_target
}
)
self.cost_history.append(self.cost)
# 这一步是为了平衡策略的Exploration和Exploitation问题:设置一个较小的self.epsilon值,则在初期时行为策略会产生大量的探索行为(随机选择产生),然后逐步增加这个值,随机的探索行为会逐渐减少,使得行为策略会逐渐的逼近估计策略:贪婪策略。
self.epsilon = self.epsilon + self.epsilon_increment if self.epsilon < self.epsilon_max else self.epsilon_max
self.learn_step_counter += 1

网络参数复制

通过两个网络不同的collection分别得到两个网络各层参数的Tensor,再通过tf.assign()逐层的将Evaluate_Network的参数复制给Target_Network的对应层。

1
2
3
4
def replace_target_params(self):
target_params = tf.get_collection('target_net_params')
evaluate_params = tf.get_collection('eval_net_params')
self.sess.run( [ tf.assign(t, e) for t,e in zip(target_params, evaluate_params) ] )

参考文献

  1. A tutorial of deep reinforcement learning from Deepmind ;
  2. Human-level control through deep reinforcement learning
  3. 莫烦实现的DQN及其实验环境