Coding-强化学习-策略梯度方法(PG)-控制倒立杆
摘要
使用策略梯度方法训练策略网络
1.play code
2.训练code
3.测试code
play code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 import gymimport pygameimport timeimport random""" 这个脚本是一个简单的 OpenAI Gym 库中 CartPole-v1 环境的实现。 游戏通过键盘上的左右箭头键进行控制。 目标是尽可能长时间地平衡一个移动小车上的杆。 如果杆倒下或小车移出边界,游戏将结束。 脚本使用 Pygame 处理键盘输入,并在每一步之间暂停游戏一小段时间以允许人类反应。 游戏将在每一步打印小车和杆的状态,并在游戏结束时显示总时间和步数。 """ if __name__=='__main__' : pygame.init() env = gym.make('CartPole-v1' ,render_mode='human' ) state = env.reset() cart_position = state[0 ] cart_speed = state[1 ] pole_angle = state[2 ] pole_speed = state[3 ] print (f"Begin state: {state} " ) print (f"cart_position = {cart_position:.2 f} " ) print (f"cart_speed = {cart_speed:.2 f} " ) print (f"pole_anglee=t{pole_angle:.2 f} " ) print (f"pole_speed = {pole_speed:.2 f} " ) time.sleep(3 ) start_time = time.time() max_action = 10000 step = 0 fail = False for step in range (1 , max_action + 1 ): time.sleep(0.3 ) keys = pygame.key.get_pressed() action = 0 if not keys[pygame.K_LEFT] and not keys[pygame.K_RIGHT]: action = random.choice([0 ,1 ]) if keys[pygame.K_LEFT]: action = 0 elif keys[pygame.K_RIGHT]: action = 1 state,_,done,_ = env.step(action) if done: fail = True break print (f"step = {step} action = {action} " f"angle = {state[2 ]:.2 f} position = {state[0 ]:.2 f} " ) end_time = time.time() game_time = end_time - start_time if fail: print (f"Game over, you play {game_time:.2 f} seconds, {step} steps." ) else : print (f"Congratulation! You play {game_time:.2 f} seconds, {step} steps." ) env.close()
train code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 import gymimport pygameimport timeimport randomimport torchfrom torch import nnimport torch.nn.functional as Fimport torchimport numpy as npdef compute_policy_loss (n, log_p ): r = list () for i in range (n, 0 , -1 ): r.append(i * 1.0 ) r = torch.tensor(r) r=(r-r.mean())/r.std() loss = 0 for pi, ri in zip (log_p, r): loss += -pi * ri return loss class CartPolePolicy (nn.Module): def __init__ (self ): super (CartPolePolicy, self).__init__() self.fc1 = nn.Linear(4 , 128 ) self.fc2 = nn.Linear(128 , 2 ) self.drop = nn.Dropout(p=0.6 ) def forward (self, x ): x = self.fc1(x) x = self.drop(x) x = F.relu(x) x = self.fc2(x) return F.softmax(x, dim = 1 ) if __name__=='__main__' : pygame.init() env = gym.make('CartPole-v1' ,render_mode='human' ) state = env.reset() policy = CartPolePolicy() optimizer = torch.optim.Adam(policy.parameters(),lr=0.01 ) max_episode=1000 max_action=10000 max_steps=5000 for episode in range (1 , max_episode + 1 ): state = env.reset() step =0 log_p = list () for step in range (1 , max_action + 1 ): state = torch.from_numpy(state).float ().unsqueeze(0 ) probs=policy(state) m = torch.distributions.Categorical(probs) action=m.sample() state,_,done,_ = env.step(action.item()) if done: break log_p.append(m.log_prob(action)) if step>max_steps: print (f"Done! Last episode {episode} Run steps: {step} " ) break optimizer.zero_grad() Loss = compute_policy_loss(step, log_p) Loss.backward() optimizer.step() if episode % 10 ==0 : print (f'Episode {episode} Run steps: {step} ' ) torch.save(policy.state_dict(),'cartpole_policy.pth' )
test code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 import gymimport pygameimport timeimport randomimport torchfrom torch import nnimport torch.nn.functional as Fimport torchdef compute_policy_loss (n, log_p ): r = list () for i in range (n, 0 , -1 ): r.append(i * 1.0 ) r = torch.tensor(r) r=(r-r.mean())/r.std() Loss = 0 for pi, ri in zip (log_p, r): loss += -pi * ri return loss class CartPolePolicy (nn.Module): def __init__ (self ): super (CartPolePolicy, self).__init__() self.fc1 = nn.Linear(4 , 128 ) self.fc2 = nn.Linear(128 , 2 ) self.drop = nn.Dropout(p=0.6 ) def forward (self, x ): x = self.fc1(x) x = self.drop(x) x = F.relu(x) x = self.fc2(x) return F.softmax(x, dim = 1 ) if __name__=='__main__' : pygame.init() env = gym.make('CartPole-v1' ,render_mode='human' ) state = env.reset() cart_position = state[0 ] cart_speed = state[1 ] pole_angle = state[2 ] pole_speed = state[3 ] print (f"Begin state: {state} " ) print (f"cart_position = {cart_position:.2 f} " ) print (f"cart_speed = {cart_speed:.2 f} " ) print (f"pole_anglee=t{pole_angle:.2 f} " ) print (f"pole_speed = {pole_speed:.2 f} " ) time.sleep(3 ) policy = CartPolePolicy() policy.load_state_dict(torch.load('cartpole_policy.pth' )) policy.eval () start_time = time.time() max_action = 1000 step = 0 fail = False for step in range (1 , max_action + 1 ): time.sleep(0.1 ) state = torch.from_numpy(state).float ().unsqueeze(0 ) probs = policy(state) action = torch.argmax(probs,dim=1 ).item() state,_,done,_ = env.step(action) if done: fail = True break print (f"step = {step} action = {action} " f"angle = {state[2 ]:.2 f} position = {state[0 ]:.2 f} " ) end_time = time.time() game_time = end_time - start_time if fail: print (f"Game over, you play {game_time:.2 f} seconds, {step} steps." ) else : print (f"Congratulation! You play {game_time:.2 f} seconds, {step} steps." ) env.close()