1. 抽象
所有算法实现都是在main()
实现算法与环境交换与训练。因此,我们对该函数进行抽象,获得范式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def main (): ''' 使用'gym'创建游戏环境 创建buffer 创建并初始化网络 初始化更新interval 为网络建立优化器 主循环(episode次数) env重置 当done为false: 采样动作,以及对动作做必要处理 env.step(动作)【agent采取动作与env交互】 存储样本于buffer buffer容量超过阈值: 多轮训练网络,用batch_size的样本对网络做反传,更新参数 关闭环境env '''
2. 算法
本节对所有算法的伪代码对照,并进行代码学习。
2.1 DQN
NIPS 13版本的DQN:
对于CartPole问题,我们进一步简化: * 无需预处理Preprocessing。也就是直接获取观察Observation作为状态state输入。 * CartPole问题observation的四个元素分别表示:小车位置、小车速度、杆子夹角及角变化率 * 只使用最基本的MLP神经网络,而不使用卷积神经网络。
算法训练对应代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 def train (q, q_target, memory, optimizer ): for i in range (10 ): s,a,r,s_prime,done_mask = memory.sample(batch_size) q_out = q(s) q_a = q_out.gather(1 ,a) max_q_prime = q_target(s_prime).max (1 )[0 ].unsqueeze(1 ) target = r + gamma * max_q_prime * done_mask loss = F.smooth_l1_loss(q_a, target) optimizer.zero_grad() loss.backward() optimizer.step()
网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class Qnet (nn.Module ): def __init__ (self ): super (Qnet, self).__init__() self.fc1 = nn.Linear(4 , 128 ) self.fc2 = nn.Linear(128 , 128 ) self.fc3 = nn.Linear(128 , 2 ) def forward (self, x ): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x def sample_action (self, obs, epsilon ): out = self.forward(obs) coin = random.random() if coin < epsilon: return random.randint(0 ,1 ) else : return out.argmax().item()
注:
1 2 epsilon = max (0.01 , 0.08 - 0.01 *(n_epi/200 ))
1 2 done_mask = 0.0 if done else 1.0 memory.put((s,a,r/100.0 ,s_prime, done_mask))
2.2 Actor critic
2.3 Advantage actor critic [a2c]
算法对应代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 envs = ParallelEnv(n_train_processes) model = ActorCritic() optimizer = optim.Adam(model.parameters(), lr=learning_rate) step_idx = 0 s = envs.reset() print('s' ,s) while step_idx < max_train_steps: s_lst, a_lst, r_lst, mask_lst = list (), list (), list (), list () for _ in range (update_interval): prob = model.pi(torch.from_numpy(s).float ()) a = Categorical(prob).sample().numpy() s_prime, r, done, info = envs.step(a) s_lst.append(s) a_lst.append(a) r_lst.append(r/100.0 ) mask_lst.append(1 - done) s = s_prime step_idx += 1 s_final = torch.from_numpy(s_prime).float () v_final = model.v(s_final).detach().clone().numpy() td_target = compute_target(v_final, r_lst, mask_lst) td_target_vec = td_target.reshape(-1 ) s_vec = torch.tensor(s_lst).float ().reshape(-1 , 4 ) a_vec = torch.tensor(a_lst).reshape(-1 ).unsqueeze(1 ) advantage = td_target_vec - model.v(s_vec).reshape(-1 ) pi = model.pi(s_vec, softmax_dim=1 ) pi_a = pi.gather(1 , a_vec).reshape(-1 ) loss = -(torch.log(pi_a) * advantage.detach()).mean() +\ F.smooth_l1_loss(model.v(s_vec).reshape(-1 ), td_target_vec) optimizer.zero_grad() loss.backward() optimizer.step() if step_idx % PRINT_INTERVAL == 0 : test(step_idx, model) envs.close()
网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class ActorCritic (nn.Module ): def __init__ (self ): super (ActorCritic, self).__init__() self.fc1 = nn.Linear(4 , 256 ) self.fc_pi = nn.Linear(256 , 2 ) self.fc_v = nn.Linear(256 , 1 ) def pi (self, x, softmax_dim=1 ): x = F.relu(self.fc1(x)) x = self.fc_pi(x) prob = F.softmax(x, dim=softmax_dim) return prob def v (self, x ): x = F.relu(self.fc1(x)) v = self.fc_v(x) return v
2.4 a3c
2.5 DDPG
算法训练对应代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 train(mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer) soft_update(mu, mu_target) soft_update(q, q_target) def train (mu, mu_target, q, q_target, memory, q_optimizer, mu_optimizer ): s,a,r,s_prime,done_mask = memory.sample(batch_size) target = r + gamma * q_target(s_prime, mu_target(s_prime)) * done_mask q_loss = F.smooth_l1_loss(q(s,a), target.detach()) q_optimizer.zero_grad() q_loss.backward() q_optimizer.step() mu_loss = -q(s,mu(s)).mean() mu_optimizer.zero_grad() mu_loss.backward() mu_optimizer.step() def soft_update (net, net_target ): for param_target, param in zip (net_target.parameters(), net.parameters()): param_target.data.copy_(param_target.data * (1.0 - tau) + param.data * tau)
网络:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class MuNet (nn.Module ): def __init__ (self ): super (MuNet, self).__init__() self.fc1 = nn.Linear(3 , 128 ) self.fc2 = nn.Linear(128 , 64 ) self.fc_mu = nn.Linear(64 , 1 ) def forward (self, x ): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) mu = torch.tanh(self.fc_mu(x))*2 return mu class QNet (nn.Module ): def __init__ (self ): super (QNet, self).__init__() self.fc_s = nn.Linear(3 , 64 ) self.fc_a = nn.Linear(1 ,64 ) self.fc_q = nn.Linear(128 , 32 ) self.fc_out = nn.Linear(32 ,1 ) def forward (self, x, a ): h1 = F.relu(self.fc_s(x)) h2 = F.relu(self.fc_a(a)) cat = torch.cat([h1,h2], dim=1 ) q = F.relu(self.fc_q(cat)) q = self.fc_out(q) return q
注:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class OrnsteinUhlenbeckNoise : def __init__ (self, mu ): self.theta, self.dt, self.sigma = 0.1 , 0.01 , 0.1 self.mu = mu self.x_prev = np.zeros_like(self.mu) def __call__ (self ): x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \ self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape) self.x_prev = x return x ou_noise = OrnsteinUhlenbeckNoise(mu=np.zeros(1 )) while not done: a = mu(torch.from_numpy(s).float ()) a = a.item() + ou_noise()[0 ]
2.6 PPO
理解:critic更新方式与ac一样,loss是估计值与实际值对比,做mean squared error 反向传播梯度。
actor更新用advantage,即 每一步多少优势。对loss理解:新policy与老policy比。若优势大,新老policy比大,更新的幅度也大些,后项是减去kl penalty,新老policy差的数,差的太多,当成对loss的惩罚,限制了新策略更新幅度
代码学习
load_state_dict
1 2 3 q_target.load_state_dict(q.state_dict())