强化学习小例子


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# -*- coding: utf-8 -*-
import numpy as np
import pandas as pd
import time


## 全局变量申明
N_STATES = 100 # 1维世界的宽度 最开始的距离,比如宝藏距离初始点为6
ACTIONS = ['left', 'right'] # 探索者的可用动作 向左 向右走
EPSILON = 0.95 # 贪婪度 greedy 90%选择最有动作,10%选择随机
ALPHA = 0.5 # 学习率
LAMBDA = 0.9 # 奖励递减值,衰减度
MAX_EPISODES = 100 # 最大回合数 只玩MAX_EPISODES回合
FRESH_TIME = 0.001 # 移动间隔时间 用来规定一步的刷新时间

##建立Q-table
def build_q_table(n_states, actions):
table = pd.DataFrame(
np.zeros((n_states, len(actions))), # q_table 全 0 初始
columns=actions, # columns 对应的是行为名称
)
# print "q_table\n" ,table #
return table

# 在某个 state 地点, 选择行为
def choose_action(state, q_table):
# 选出这个 state 的所有 action 值
state_actions = q_table.iloc[state, :]
# 非贪婪 or 或者这个 state 还没有探索过
if (np.random.uniform() > EPSILON) or (state_actions.all() == 0):
action_name = np.random.choice(ACTIONS)
else:
action_name = state_actions.argmax() # 贪婪模式
return action_name

def get_env_feedback(S, A):
# 环境给行为一个反馈,输入上个 state (S) 做出 action (A) ,
# 反馈出下个 state (S_)和所得到的 reward (R).
if A == 'right': # 向右移动
if S == N_STATES - 2: # terminate
S_ = 'terminal'
R = 1
else:
S_ = S + 1 # 右移 状态+1
R = 0
else: # 向左移动
R = 0 # 未得到宝藏,R为0
if S == 0:
S_ = S # 最左侧无法移动,原地不动
else:
S_ = S - 1 # 左移 状态-1
return S_, R

def update_env(S, episode, step_counter):
# This is how environment be updated
env_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environment
if S == 'terminal':
interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
print('\r{}'.format(interaction))
print('\r ')
else:
env_list[S] = 'o'
interaction = ''.join(env_list)
#print('\r{}'.format(interaction))
time.sleep(FRESH_TIME)

def rl():
q_table = build_q_table(N_STATES, ACTIONS) # 初始 创建 q table
#for
for episode in range(MAX_EPISODES): # 回合
step_counter = 0
S = 0 # 回合初始位置 最左边
is_terminated = False # 是否游戏结束
update_env(S, episode, step_counter) # 环境更新
#while
while not is_terminated: # 当游戏继续
A = choose_action(S, q_table) # 选行为
S_, R = get_env_feedback(S, A) # 实施行为并得到环境的反馈
q_predict = q_table.ix[S, A] # 估算的(状态-行为)值,从Q表读出来的
if S_ != 'terminal':
q_target = R + LAMBDA * q_table.iloc[S_, :].max()
# 真实值:实际的(状态-行为)值 (回合没结束)
# 下一个状态的Qmax值
else:
q_target = R # 实际的(状态-行为)值 (回合结束)没有下一个Qmax,直接为R
is_terminated = True # terminate this episode
q_table.ix[S, A] += ALPHA * (q_target - q_predict) # q_table 更新
S = S_ # 探索者移动到下一个 state
update_env(S, episode, step_counter + 1) # 环境更新
step_counter += 1
# end while
# end for
#print q_table
return q_table


def getMax(q_table):
q_table['max'] = q_table['left'] > q_table['right']
for i in range(0,len(q_table)):
q_table.ix[i,'max'] = 'left' if q_table.ix[i,'max'] else 'right'
return q_table

q_table = rl()
print('\r\nQ-table:\n')
print getMax(q_table)