强化学习(四):基本算法实现

整体思路

我们选择Q-学习、Sarsa学习、Sarsa($\lambda$)学习、Q($\lambda$)学习这四种能够在线增量式学习的算法进行实现,它们对应的算法流程均由下列步骤组成:
1、初始化操作;
2、从当前状态出发,根据选择的行为策略,发出相应的动作;
3、动作作用于环境,获得Transiton;
4、根据Transiton更新Q函数;
5、重复执行2)—4)步。

因此,因此在算法中,我们需要:
1、初始化的算法的超参数、Q函数、资格迹Z(S,A);
2、确定产生Trajectory的行为策略;
3、定义选择动作的函数;
4、定义环境对于每个(状态,动作)的返回值函数;
5、定义相应的更新Q函数的函数;

为了平衡Exploration和Exploitation的问题,四个算法的行为策略均采用$\epsilon$-贪婪策略:以$\epsilon$的概率取当前最佳的动作,以$1-\epsilon$的概率随机从所有可能的动作中选择。

参数$\epsilon$需要初始化,各个算法其余需要初始化的参数与它们各自的Q函数更新方式有关,下面是四种算法核心的Q函数更新公式:
Q-学习

Sarsa学习:

Sarsa($\lambda$)学习:

Q($\lambda$)学习:

其中,Sarsa($\lambda$)的资格迹可以选择如下两种形式:

Q($\lambda$)学习的资格迹定义为:

可以看到,四种算法均需要:学习率$\alpha$、折扣因子$\gamma$、贪婪参数$\epsilon$,而对于Q($\lambda$)学习和Sarsa($\lambda$)学习算法,还需要表示资格迹衰减的因子$\lambda$。

因为四个算法大部分的参数一致且使用同一个行为策略,因此考虑将这部分内容实现在父类RL_main中,四个算法不同的Q函数更新函数则在各自的子类中实现,因此整体代码的框架如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class RL_main(object):
# 三个公共参数:学习率、折扣因子、贪婪参数,在父类中初始化;
# 参数action_spaces表示所有可能取的动作,用于初始化Q表格的列;
def __init__(self,action_spaces,learning_rate=0.01,reward_decay=0.7,e_greedy=0.9):
self.actions = action_spaces # 动作空间
self.lr = learning_rate # 学习率alpha
self.gamma = reward_decay # 折扣因子gamma
self.epsilon = e_greedy # 贪婪策略epsilon
self.q_table = pd.DataFrame(columns=self.actions)
# 检查状态state是否在Q表格中,若无,则添加相应的行
def check_state_exist(self, state):
.....
# 行为策略函数
def choose_action(self, observation):
self.check_state_exist(observation)
.....
# Q(表格)函数的更新函数,在各个子类中实现
def learning(self, *args):
pass
# Q-学习算法子类
class QLearningTable(RL_main):
def __init__(self,actions,learning_rate=0.02,reward_decay=0.8,e_greedy=0.9):
# 直接调用父类的__init__函数初始化对象
super(QLearningTable, self).__init__(actions,learning_rate,reward_decay,e_greedy)
# Q-学习算法更新Q-函数
def learning(self,s,a,r,s_):
self.check_state_exist(s_)
.....
# Sarsa学习算法子类
class SarsaTable(RL_main):
def __init__(self,actions,learning_rate=0.02,reward_decay=0.8,e_greedy=0.9):
# 直接调用父类的__init__方法初始化对象
super(SarsaTable,self).__init__(actions,learning_rate,reward_decay,e_greedy)
# Sarsa学习算法更新Q函数
def learning(self,s,a,r,s_,a_):
self.check_state_exist(s_)
.....
# Sarsa(lambda)学习算法子类
class SarsaLambdaTable(RL_main):
# 需要多传入表示资格迹衰减速度的因子;
# 需要额外初始化表示资格迹的表格;
def __init__(self,actions,learning_rate=0.01,reward_decay=0.9,e_greedy=0.9,trace_decay=0.9):
super(SarsaLambdaTable,self).__init__(actions,learning_rate,reward_decay,e_greedy)
self.lambda_ = trace_decay # 资格迹衰减因子
self.eligibility_trace = self.q_table.copy()
# 因为多了资格迹表格,因此需要重写父类的check_state_exist函数;
def check_state_exist(self,state):
.....
# Sarsa(lambda)学习算法更新Q函数
def learning(self,s,a,r,s_,a_):
.....
# Watkins‘s Q(lambda)学习算法子类
class QLearningLambdaTable(RL_main):
# 类似于Sarsa子类,需要多传入一个参数和初始化资格迹表格;
def __init__(self,actions,learning_rate=0.02,reward_decay=0.9,e_greedy=0.9,trace_decay=0.9):
super(QLearningLambdaTable,self).__init__(actions,learning_rate,reward_decay,e_greedy)
self.lambda_ = trace_decay # 资格迹衰减因子
self.eligibility_trace = self.q_table.copy()
# 重写继承自父类的check_state_exist函数
def check_state_exist(self,state):
.....
# Watkins‘s Q(lambda)学习算法更新Q函数
def learning(self,s,a,r,s_,a_):
.....

实现父类

接下来先实现父类中的动作选择函数choose_action():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class RL_main(object):
def __init__(self,action_spaces,learning_rate=0.01,reward_decay=0.7,e_greedy=0.9):
......
def check_state_exist(self, state):
......
def choose_action(self, observation):
# 检查当前状态observation是否在Q表格中,若无,则增加并初始化相应的行
self.check_state_exist(observation)
# 按照贪婪策略选择动作
if np.random.rand() < self.epsilon:
state_action = self.q_table.ix[observation, :]
state_action = state_action.reindex(np.random.permutation(state_action.index))
action = state_action.argmax()
# 随机选择动作
else:
action = np.random.choice(self.actions)
return action

状态检查函数如下:

1
2
3
4
5
6
7
8
9
10
# 若状态state不在Q表格中,则新建一行相应的状态记录,初始化所有的Q(state, )为0;
def check_state_exist(self, state):
if state not in self.q_table.index:
self.q_table = self.q_table.append(
pd.Series(
[0]*len(self.actions),
index = self.q_table.columns,
name = state,
)
)

实现子类Q学习

Q-学习子类只需要实现相应的Q表格更新函数,根据它的更新表达式:

相应的learning函数实现方式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 更新所需要的Transition为(S,A,R,S')
def learning(self,s,a,r,s_):
self.check_state_exist(s_)
# 上一个时刻对应的Q值
q_predict = self.q_table.ix[s, a]
# 如果没有到达终止状态,
# 则根据公式中括号部分计算我们所希望达到的Q值
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.ix[s_,:].max()
# 如果达到终止状态,则不对Q进行更新
else:
q_target = r
# 根据Q学习的Q函数更新公式进行更新
self.q_table.ix[s,a] += self.lr * (q_target - q_predict)

实现子类Sarsa学习

同样,根据Sarsa学习算法的Q函数更新公式:

Sarsa算法与Q学习算法非常接近,唯一的区别就在状态$S’$时选择动作的方式不同。相应的learning函数实现为:

1
2
3
4
5
6
7
8
9
# 更新Q函数需要的Transition为(S,A,R,S',A')
def learning(self,s,a,r,s_,a_):
self.check_state_exist(s_)
q_predict = self.q_table.ix[s,a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.ix[s_,a_]
else:
q_target = r
self.q_table += self.lr * (q_target - q_predict)

实现子类Sarsa($\lambda$)

n-step下的Sarsa算法增加了每个(状态,动作)的资格迹计算,因此还需要在check_state_exist()函数中检查资格迹表格 self.eligibility_trace 是否存在对应状态的记录行,若没有,则跟Q表格类似,需要增加相应状态对应的资格迹记录行。因为Z(S,A)和Q(S,A)是在相同的状态集$S$和动作集$A$进行描述,差异仅仅在于记录的描述不同,因此初始化时使用相同的新增记录行。具体实现如下:

1
2
3
4
5
def check_state_exist(self,state):
if state not in self.q_table.index:
appended = pd.Series([0]*len(self.actions), index=self.q_table.columns, name=state)
self.q_table = self.q_table.append(appended)
self.eligibility_trace = self.eligibility_trace.append(appended)

Q函数更新遵循的表达式为:

两种可选的资格迹定义为:

我们同时在learning函数中实现这两种定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 更新Q函数需要的Transition为(S,A,R,S',A')
def learning(self,s,a,r,s_,a_):
self.check_state_exist(s_)
# 首先计算除资格迹之外的部分,这一部分与Sarsa学习算法保持一致
q_predict = self.q_table.ix[s,a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.ix[s_,a_]
else:
q_target = r
predict_error = q_target - q_predict
# 然后计算当前需要的资格迹值 Z_t(S,A),
# 需要注意的是 Z_{t-1}(S,A)已经在上一次(S,A)出现时进行了更新,
# 关于这一点的详细说明请参照本博文的Sarsa(lambda)部分的解释
# 累积式的资格迹定义方式
# self.eligibility_trace.ix[s,a] += 1
# 替代式的资格迹定义方式
self.eligibility_trace.ix[s,:] *= 0
self.eligibility_trace.ix[s,a] = 1
# 根据更新公式,完成Q值的更新
self.q_table += self.lr * predict_error * self.eligibility_trace
# 提前计算当前时刻(S,A)与两个参数的乘积
self.eligibility_trace *= self.gamma * self.lambda_

实现子类Q($\lambda$)

Q($\lambda$)对应的Q值更新表达式为:

Q($\lambda$)学习算法对应的资格迹定义如下所示,关于它的理解请参照本博文讲述Q($\lambda$)算法的小节:

与Sarsa($\lambda$)类似,需要重写继承自父类的函数check_state_exist(),实现方式与Sarsa($\lambda$)保持一致,这里只给出learning函数的实现方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 更新Q值所需要的Transition为(S,A,R,S,A')
# 其中 A' 仅仅用于资格迹的计算
def learning(self,s,a,r,s_,a_):
# 首先计算除资格迹之外的部分,这一部分与Q学习算法保持一致
self.check_state_exist(s_)
q_predict = self.q_table.ix[s,a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.ix[s_,:].max()
else:
q_target = r
predict_error = q_target - q_predict
# 计算当前(S,A)的资格迹,并完成Q值的更新
self.eligibility_trace.ix[s, a] += 1
self.q_table += self.lr * predict_error * self.eligibility_trace
# 根据资格迹的定义预计算和更新下一时刻的资格迹值
# 首先根据贪婪策略,选择当前状态下的最佳动作
state_action = self.q_table.ix[s_, :]
state_action = state_action.reindex(np.random.permutation(state_action.index))
action = state_action.argmax()
# 然后判断当前实际取的动作和贪婪策略下选择的动作一致
# 如果一致,则将所有的(s,a)对乘上两个因子
if a_ == action:
self.eligibility_trace *= self.gamma * self.lambda_
# 如果不一致,即当前实际的动作为non-greedy动作,
# 则将所有的(s,a)重置为0,当前时刻的(s,a)资格迹会在同一个状态和动作的下一个时刻加上1
else:
self.eligibility_trace *= 0

根据Sarsa($\lambda$)和Q($\lambda$)的算法伪代码,每一个episode后的资格迹需要重新计算,因此在每个episode调用这两个子类的learning之前,需要重置资格迹表格元素值全部为0.

运行

算法的运行请参照莫烦大神的代码,上面四个算法中,Q-学习、Sarsa学习和Sarsa($\lambda$)的代码来自于莫烦的实现,但是莫烦没有实现Q($\lambda$),因此在它的基础上,实现了Watkins提出的Q($\lambda$)版本的实现。

参考文献

1、Richard S. Sutton. Reinforcement learning An introduction. Second edition. MIT Press;
2、莫烦实现的前三个算法;