强化学习的数学原理及 Python 实现

2020-12-24 14:24

策略梯度法

$$\theta_{s_i,a_j}=\theta_{s_i,a_j}+\eta*\Delta\theta_{s_i,a}$$

梯度下降法,$$\theta_{s_i,a_j}$$ 表示在状态 $$s_i$$ 下采取动作 $$a_j$$ 的概率

$$\Delta\theta_{s_i,a}=\frac{N(s_i,a_j)-P(s_i,a_j)*N(s_i,a)}{T}$$

$$N(s_i,a_j)$$ 表示实际在状态 $$s_i$$ 下采取动作 $$a_j$$ 的次数

$$P(s_i,a_j)$$ 表示预测在状态 $$s_i$$ 下采取动作 $$a_j$$ 的概率

$$N(s_i,a)$$ 表示实际在状态 $$s_i$$ 下采取所有动作的次数

$$T$$ 表示总步数

        maze=[
          [1, 2, 3],
          [4, 5, 6],
          [7, 8, 9]
        ] # 从 1 出发 到 9 结束,墙参考 theta

        theta=[
          [0,   0.5, 0.5, 0] # 1 的初始行动(上,右,下,左)概率,为 0 时表示有墙
          [0,   0.5, 0,   0.5]
          [0,   0,   0.5, 0.5]
          [0.3, 0.3, 0.3, 0]
          [0,   0,   0.5, 0.5]
          [1,   0,   0,   0]
          [1,   0,   0,   0]
          [0.5, 0.5, 0,   0] # 8 的初始行动概率
        ]

        def get_next_s(s,a):
          if a==0 return s-3
          if a==1 return s+1
          if a==2 return s+3
          if a==3 return s-1

        def get_next_a(s):
          return np.random.choice([0,1,2,3],p=theta[s,:]) # 根据数组中的概率 随机选出一个

        s_a_history=[]

        s=0
        a=get_next_a(s) # 假设 a=1
        while True:
          s_a_history.append([s,a])

          next_s=get_next_s(s,a) # next_s=1
          next_a=get_next_a(s_next) # next_a=1

          if next_s==8:
            break

          s=next_s
          a=next_a

        # 假设在一次尝试中

        T=len([
          [0, 2],
          [3, 0],
          [0, 1],
          [1, 3],
          [0, 2],
          [3, 1],
          [4, 2],
          [7, 1],
          [8, nan]
        ]) # 9=len(s_a_history)

        # 假设 s_i=0, s_j=2
        N_ij=len([
          [0,2],
          [0,2]
        ]) # 2
        N_i=len([
          [0,2],
          [0,1],
          [0,2]
        ]) # 3

        eta=0.1
        theta[0,2]=theta[0,2]+eta*((N_ij-theta[0,2]*N_i)/T) # 0.505=0.5+0.1*((2-0.5*3)/9)
      

贝尔曼方程

$$V^\pi(s)=\max \limits_a\sum{R_{s,a}+\gamma*V^\pi(s(s,a))}$$

$$V^\pi(s)$$ 表示状态 s 的最大价值

$$R_{s,a}$$ 表示在状态 s 下采取动作 a 的奖励

$$\gamma$$ 表示时间折扣率

$$V^\pi(s(s,a))$$ 表示在状态 s 下采取动作 a 后状态 s+1 的价值

Sarsa 算法

贝尔曼变形:$$Q(s_t,a_t)=R_{t+1}+\gamma*Q(s_{t+1},a_{t+1})$$

等式变形:$$TD=R_{t+1}+\gamma*Q(s_{t+1},a_{t+1})-Q(s_t,a_t)$$,TD 为 0 时求出解

$$Q(s_t,a_t)=Q(s_t,a_t)+\eta*TD$$

        # maze 略
        # Q 同 theta
        # get_next_s(s,a) 略

        def get_next_a(s):
          return np.argmax(Q[s,:]) # 数组中最大值的索引

        gamma=0.9
        eta=0.1

        s=0
        a=get_next_a(s) # 假设 a=1
        while True:
          next_s=get_next_s(s,a) # next_s=1
          next_a=get_next_a(s_next) # next_a=1

          if next_s==8:
            reward=1
          else:
            reward=0

          Q[s,a]=Q[s,a]+eta*(reward+gamma*Q[next_s,next_a]-Q[s,a]) # 0.495=0.5+0.1*(0+0.9*0.5-0.5)

          s=next_s
          a=next_a
      

Q 算法

Sarsa 变形:$$TD=R_{t+1}+\gamma*\max \limits_aQ(s_{t+1},a)-Q(s_t,a_t)$$

        # 前略

        Q[s,a]=Q[s,a]+eta*(reward+gamma*np.max(Q[next_s,:])-Q[s,a]) # 0.495=0.5+0.1*(0+0.9*0.5-0.5)

        # 后略
      

DQN

        # maze 略
        # get_next_s(s,a) 略

        memory=[]

        def get_next_a(s):
          # 根据 nn 得到 s 状态下最大的 a
          return

        def update_Q():
          # 从 memory 中随机获取一批数据
          # 根据 nn 得到 Q(s_t,a_t) 和 max{Q(s_t+1,a)}
          # 根据 Q 算法得到 TD,TD = (reward + gamma * max{Q(s_t+1,a)}) - Q(s_t,a_t)
          # 使用 TD 反向传播更新 nn 参数

        s = 0
        while True:
          next_a = get_next_a(s)
          next_s = get_next_s(s,next_a)

          if next_s==8:
            reward=1
          else:
            reward=0

          memory.push([s,next_a,next_s,reward])

          update_Q()

          s = next_s
      

DDQN

$$Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma*\max \limits_aQ_t(s_{t+1},a)-Q_m(s_t,a_t))$$

使用 $$a_m=arg\max \limits_aQ_m(s_{t+1},a)$$ 优化:

$$Q_m(s_t,a_t)=Q_m(s_t,a_t)+\eta*(R_{t+1}+\gamma*Q_t(s_{t+1},a_m)-Q_m(s_t,a_t))$$

$$Q_m$$ 为主网络,$$Q_t$$为目标网络,使用主网络的参数覆盖目标网络的参数

        # 前略

        def get_next_a(s):
          # 根据主 nn 得到 s 状态下最大的 a
          return

        def update_main_Q():
          # 从 memory 中随机获取一批数据
          # 根据主 nn 得到 Q(s_t,a_t)
          # 从主 nn 得到 a_m=argmax{Q(s_t+1,a)}
          # 从目标 nn 得到 Q(s_t+1,a_m)
          # 根据 Q 算法得到 TD,TD = (reward + gamma * Q(s_t+1,a_m)) - Q(s_t,a_t)
          # 使用 TD 反向传播更新主 nn 参数
          return

        def update_target_Q():
          # 目标 nn 的参数 = 主 nn 的参数
          return

        s = 0
        while True:
          next_a = get_next_a(s)
          next_s = get_next_s(s,next_a)

          if next_s==8:
            reward=1
            update_target_Q()
          else:
            reward=0

          memory.push([s,next_a,next_s,reward])

          update_main_Q()

          s = next_s
      

DuelingNetwork

Advantage:$$A(s,a_0)=Q(s,a_0)-V(s)$$

价值:$$V(s)$$

$$Q(s,a_0)=V(s)+Adv(s,a_0)-\frac{Adv(s,a_0)+Adv(s,a_1)+Adv(s,a_2)+Adv(s,a_3)}{4}$$

        # 原 nn
        model.add_module('fc1', nn.Linear(n_in, n_mid))
        model.add_module('relu1', nn.ReLU())
        model.add_module('fc2', nn.Linear(n_mid, n_mid))
        model.add_module('relu2', nn.ReLU())
        model.add_module('fc3', nn.Linear(n_mid, n_out))
        # 输出 Q_s_0 Q_s_1 Q_s_2 Q_s_3

        # DuelingNetwork
        model.add_module('fc1', nn.Linear(n_in, n_mid))
        model.add_module('relu1', nn.ReLU())
        model.add_module('fc2', nn.Linear(n_mid, n_mid))
        model.add_module('relu2', nn.ReLU())
        model.add_module('fc3_adv', nn.Linear(n_mid, n_out))
        model.add_module('', )
        model.add_module('fc3_v', nn.Linear(n_mid, 1))
        model.add_module('', )
        # 输出 V_s A_s_0 A_s_1 A_s_2 A_s_3
      

优先经验回放

        # 前略
        # td_memory=[td,memory]

        def update_td():
          # TD 同 DDQN
          return

        def update_main_Q():
          # 从 td_memory 中获取 td 最大的一批数据
          # 后略
          return

        s = 0
        while True:
          next_a = get_next_a(s)
          next_s = get_next_s(s,next_a)

          if next_s==8:
            reward=1
            update_td()
            update_target_Q()
          else:
            reward=0

          memory.push([s,next_a,next_s,reward])

          update_main_Q()

          s = next_s
      

A2C (Advanced Actor-Critic)

使用 2 步来更新 Q:$$Q(s_t,a_t)=R_{t+1}+\gamma*R_{t+2}+\gamma^2*\max \limits_aQ(s_{t+2},a)$$

$$J(\theta,s_t)=\mathbb E[\log\pi_\theta(a|s)*(Q^\pi(s,a)-V^\pi_s)]$$

$$Actor_{entropy}=-\mathbb E[\sum{\pi_\theta(a|s)*\log\pi_\theta(a|s)}]$$

$$loss_{Critic}=\mathbb E[(Q^\pi(s,a)-V^\pi_s)^2]$$

$$\mathbb E$$ 表示求平均值

$$\pi_\theta(a|s)$$ 表示在状态 s 下采取动作 a 的概率

$$\log\pi_\theta(a|s)$$ 表示在状态 s 下采取动作 a 的概率的对数

$$Q^\pi(s,a)$$ 表示在状态 s 下采取动作 a 的价值

$$V^\pi_s$$ 表示状态 s 的价值

        # 状态 state, 行动 action, 奖励 reward
        # actor, value=model(state)
        #
        # actor_gain=(logsoftmax(actor).gather(action) * (reward-value).detach()).mean()
        # entropy=-(softmax(actor) * logsoftmax(actor)).sum().mean()
        # value_loss=(reward-value).pow(2).mean()