Coding-强化学习-策略梯度方法(PG)-控制倒立杆

摘要

使用策略梯度方法训练策略网络
1.play code
2.训练code
3.测试code

play code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73

import gym
import pygame
import time
import random

"""
这个脚本是一个简单的 OpenAI Gym 库中 CartPole-v1 环境的实现。
游戏通过键盘上的左右箭头键进行控制。
目标是尽可能长时间地平衡一个移动小车上的杆。
如果杆倒下或小车移出边界,游戏将结束。
脚本使用 Pygame 处理键盘输入,并在每一步之间暂停游戏一小段时间以允许人类反应。
游戏将在每一步打印小车和杆的状态,并在游戏结束时显示总时间和步数。
"""

if __name__=='__main__':
pygame.init()
env = gym.make('CartPole-v1',render_mode='human')

state = env.reset()

cart_position = state[0]#小车的位置
cart_speed = state[1]#小车的速度
pole_angle = state[2]#杆的角度
pole_speed = state[3]#杆的尖端速度

print(f"Begin state: {state}")
print(f"cart_position = {cart_position:.2f}")
print(f"cart_speed = {cart_speed:.2f}")
print(f"pole_anglee=t{pole_angle:.2f}")
print(f"pole_speed = {pole_speed:.2f}")
time.sleep(3)#暂停环境3秒,给玩家一个缓冲时间

#实现游戏过程
start_time = time.time()#记录游戏的开始时间小
max_action = 10000 #设置游戏的最大执行次数
#最多让用户输入1000次方向键,游戏就以通关结束
#这里要说的是,1000次其实已经很多了,我大概最多就能坚持100多次
step = 0
fail = False
for step in range(1, max_action + 1):
#首先使用time.sleep,使游戏暂停θ.3秒,用于人的反应
time.sleep(0.3)
#如果觉得自己反应够快,这里可以设置更短的时间,比如0.1秒
#觉得自己反应慢,就设置更长的时间,比如1秒。
#以非阻塞的方式,接收用户的键盘输入
keys = pygame.key.get_pressed()
action = 0
if not keys[pygame.K_LEFT] and not keys[pygame.K_RIGHT]:
action = random.choice([0,1])

if keys[pygame.K_LEFT]:
action = 0
elif keys[pygame.K_RIGHT]:
action = 1

state,_,done,_ = env.step(action)

if done:
fail = True
break
#打印一些调试信息
print(f"step = {step} action = {action}"
f"angle = {state[2]:.2f} position = {state[0]:.2f}")
#计算结束时间
end_time = time.time()
game_time = end_time - start_time
#打印提示
if fail:
print(f"Game over, you play {game_time:.2f} seconds, {step} steps.")
else:
print(f"Congratulation! You play {game_time:.2f} seconds, {step} steps.")
env.close()

train code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gym
import pygame
import time
import random
import torch

from torch import nn
import torch.nn.functional as F


import torch
import numpy as np
#实现损失函数的计算方法,函数传入步数n和这n步对应的概率Log-p

def compute_policy_loss(n, log_p):
#R就是q
r = list()
#构造奖励r的列表
for i in range(n, 0, -1):
r.append(i * 1.0)
r = torch.tensor(r)
r=(r-r.mean())/r.std()#进行标准化处理
loss = 0
#计算损失函数
for pi, ri in zip(log_p, r):
loss += -pi * ri
return loss

class CartPolePolicy(nn.Module):
def __init__(self):
super(CartPolePolicy, self).__init__()
#定义两个线性层,大小分别是4*128与128*2
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 2)
#定义一个dropout层,丢弃比率是60%
self.drop = nn.Dropout(p=0.6)

def forward(self, x):
x = self.fc1(x)
x = self.drop(x)
x = F.relu(x)
x = self.fc2(x)
#使用softmax决策最终的行动,是向左还是向右
return F.softmax(x, dim = 1)

if __name__=='__main__':
pygame.init()
env = gym.make('CartPole-v1',render_mode='human')

state = env.reset()
policy = CartPolePolicy()
optimizer = torch.optim.Adam(policy.parameters(),lr=0.01)
max_episode=1000
max_action=10000
max_steps=5000


for episode in range(1, max_episode + 1):
#对于每一轮循环,都要重新启动一次游戏环境小黑黑讲
state = env.reset()
step =0
log_p = list()
for step in range(1, max_action + 1):
state = torch.from_numpy(state).float().unsqueeze(0)
probs=policy(state) #计算神经网络给出的行动概率
#基于网络给出的概率分布,随机选择行动,m是一个概率对象
m = torch.distributions.Categorical(probs)
action=m.sample()
# 这里并不是直接使用概率较大的行动
#而是通过概率分布生成action,这样可以进一步探索低概率的行动

state,_,done,_ = env.step(action.item())

if done:
break

log_p.append(m.log_prob(action))

if step>max_steps:#当step大于最大步数时
print(f"Done! Last episode {episode} Run steps: {step}")
break #跳出循环,结束训练

#每一回合游戏,都会做一次梯度下降算法
# Example of calculating the natural logarithm of an array using numpy


optimizer.zero_grad()
Loss = compute_policy_loss(step, log_p)
Loss.backward()
optimizer.step()
if episode % 10==0:
print(f'Episode {episode} Run steps: {step}')

torch.save(policy.state_dict(),'cartpole_policy.pth')

test code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import gym
import pygame
import time
import random
import torch

from torch import nn
import torch.nn.functional as F


import torch
#实现损失函数的计算方法,函数传入步数n和这n步对应的概率Log-p

def compute_policy_loss(n, log_p):
r = list()
#构造奖励r的列表
for i in range(n, 0, -1):
r.append(i * 1.0)
r = torch.tensor(r)
r=(r-r.mean())/r.std()#进行标准化处理
Loss = 0
#计算损失函数
for pi, ri in zip(log_p, r):
loss += -pi * ri
return loss

class CartPolePolicy(nn.Module):
def __init__(self):
super(CartPolePolicy, self).__init__()
#定义两个线性层,大小分别是4*128与128*2
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 2)
#定义一个dropout层,丢弃比率是60%
self.drop = nn.Dropout(p=0.6)

def forward(self, x):
x = self.fc1(x)
x = self.drop(x)
x = F.relu(x)
x = self.fc2(x)
#使用softmax决策最终的行动,是向左还是向右
return F.softmax(x, dim = 1)

if __name__=='__main__':
pygame.init()
env = gym.make('CartPole-v1',render_mode='human')

state = env.reset()

cart_position = state[0]#小车的位置
cart_speed = state[1]#小车的速度
pole_angle = state[2]#杆的角度
pole_speed = state[3]#杆的尖端速度

print(f"Begin state: {state}")
print(f"cart_position = {cart_position:.2f}")
print(f"cart_speed = {cart_speed:.2f}")
print(f"pole_anglee=t{pole_angle:.2f}")
print(f"pole_speed = {pole_speed:.2f}")
time.sleep(3)#暂停环境3秒,给玩家一个缓冲时间

policy = CartPolePolicy()
policy.load_state_dict(torch.load('cartpole_policy.pth'))
policy.eval()
#实现游戏过程
start_time = time.time()#记录游戏的开始时间小
max_action = 1000 #设置游戏的最大执行次数
#最多让用户输入1000次方向键,游戏就以通关结束
step = 0
fail = False
for step in range(1, max_action + 1):
#首先使用time.sleep,使游戏暂停θ.3秒,用于人的反应
time.sleep(0.1)


state = torch.from_numpy(state).float().unsqueeze(0)
probs = policy(state)
action = torch.argmax(probs,dim=1).item()
state,_,done,_ = env.step(action)

if done:
fail = True
break
#打印一些调试信息
print(f"step = {step} action = {action}"
f"angle = {state[2]:.2f} position = {state[0]:.2f}")
#计算结束时间
end_time = time.time()
game_time = end_time - start_time
#打印提示
if fail:
print(f"Game over, you play {game_time:.2f} seconds, {step} steps.")
else:
print(f"Congratulation! You play {game_time:.2f} seconds, {step} steps.")
env.close()