基于强化学习动态避障的Python实现(绝赞摸鱼版)

基于强化学习动态避障的Python实现

吐槽在前

这是我的研究生小课题,可是老师从头到尾没有理过我,只给了我一个题目,连稍微具体一点的要求都没提。那我就摸鱼摸爆

于是我进行了许多的简化,到最后做出了一个网格世界(GridWorld)的环境模型,在5*5的网格世界中,用一个格子表示我们的Agent,四个格子表示障碍(两个动态障碍两个静态障碍),一个格子表示目的地。(想法来自于Matlab的强化学习工具箱

参考文章

主要参考这位大佬。
TA整合了另外两位大佬的文章:
https://blog.csdn.net/extremebingo/article/details/80867486
https://blog.csdn.net/gg_18826075157/article/details/78163386

环境

VS Code下:
Python 3.8
gym 忘记是多少,但是无所谓啦只要不是远古版本肯定没问题

训练环境设置

大佬的基础上,将世界大小扩充至5*5,并加入两个会来回移动的障碍物,大致如图:

上图是初始状态下的环境,我们的agent在左上角出发,目的地设置为右下角,黑色的是静态障碍,它们不会随着时间移动,红色的是动态障碍,它们的移动速度和agent一样。我们把左边的动态障碍称为动态障碍1,右边的动态障碍称为动态障碍2。动态障碍1会沿着当前列上下来回移动,动态障碍2则只在当前位置到左边两格的范围来回移动,如图:

每个训练episode中,agent在每一个step都可以选择四个方向行进,撞墙了或者走的步数超过了20步则终止该次训练。

环境代码__init__(self):

用一个1×25的list来表示5×5的矩阵,从左上到右下分别表示为0~24(states),如此一来,我们的agent从0出发,目的地是24,静态障碍的位置分别为10和19,动态障碍1起始位置为21,动态障碍2起始位置为14。在该环境下,上走为-5,下为 5,左为-1,右为 1(actions)。在环境的class的__init__(self):下则有:

#------------------Initial states and directions--------------#self.Terminal = 24      # 5*5 space represented by 1*25 matrix: 0 ~ 24self.state = 0self.StaticObs1 = 10self.StaticObs2 = 19self.DynamicObs1 = 21self.DynamicObs2 = 14self.actions = [0, 1, 2, 3]     # Direction: Up_0, Down_1, Left_2, Right_3self.Obs1Dir = 0self.Obs2Dir = 2self.gamma = 0.8self.viewer = None#-------------------------------------------------------------##------------------Transformation Dictionary------------------#self.size = 5self.T = dict()for i in range(self.size, self.size * self.size):self.T[str(i)   '_0'] = i - 5   # States that can go upfor i in range(self.size * (self.size - 1)):    self.T[str(i)   '_1'] = i   5   # States that can go downfor i in range(1, self.size * self.size):    if i % self.size == 0:        continue    self.T[str(i)   '_2'] = i - 1   # States that can go left          for i in range(self.size * self.size):    if (i   1) % self.size == 0:        continue    self.T[str(i)   '_3'] = i   1   # States that can go right#-------------------------------------------------------------#

在这里,我觉得有一个很精妙的地方,大佬们把状态states和行为actions用拼接的方法拼到了一起,组合成了state_action这样的key,以调用行动结果以及查找该state下做这个action的奖励值(reward):

#--------------------------Rewards----------------------------#self.Rewards = dict()   # Keys are combined with 'states' and '_actions'self.Rewards[str(self.DynamicObs1 - 5)   '_1'] = -200.0self.Rewards[str(self.DynamicObs1 - 1)   '_3'] = -200.0self.Rewards[str(self.DynamicObs1   1)   '_2'] = -200.0self.Rewards[str(self.DynamicObs2 - 5)   '_1'] = -200.0self.Rewards[str(self.DynamicObs2 - 1)   '_3'] = -200.0self.Rewards[str(self.DynamicObs2   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs1   1)   '_2'] = -200.0self.Rewards[str(self.StaticObs1   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1)   '_3'] = -200.0self.Rewards[str(self.StaticObs2   5)   '_0'] = -200.0self.Rewards[str(self.Terminal - 5)   '_1'] = 100.0self.Rewards[str(self.Terminal - 1)   '_3'] = 100.0#-------------------------------------------------------------#

撞到障碍物或目的地则该次训练结束:

#-----------------Episodes stopping criterion-----------------#self.TerminateStates = dict()      # When crash into obstacles or arrive terminalself.TerminateStates[self.StaticObs1] = 1self.TerminateStates[self.StaticObs2] = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1self.TerminateStates[self.Terminal] = 1#-------------------------------------------------------------#

为了让结果可视化,我们需要自己渲染结果,比如我打算设置一个700×700的窗口,那么,每一格的中心的横坐标为[150, 250, 350, 450, 550]重复5次(因为是一个1×25的list,每5个为环境的一行),相应地,纵坐标为150,250,350,450,550分别重复5次。

#------------------Coordinates for ploting--------------------#self.x = [150,250,350,450,550] * 5self.y = [550] * 5   [450] * 5   [350] * 5   [250] * 5   [150] * 5#-------------------------------------------------------------#

以上,是环境初始化的__init__(self)函数的主要内容
接下来,是强化学习的关键,step函数和reset函数。

环境变化设置step(self, action):

step函数是会在主函数里疯狂调用的函数,它揭示了该环境下的一切变化。即在一个训练episode里,每进行一步,agent采取行动导致的变化,障碍如何移动以及障碍移动导致reward字典的变化都应该包含在step函数里。具体的思路是:在主函数里决定采取的action,然后传入step,在step里进行了障碍的移动和reward的更新后,判断该次行动带来的reward,以及此次训练是否需要结束等。故此函数需要返回的东西有:agent采取行动后的状态,奖励值,是否结束此次训练的flag。

首先,我们要判断agent是否碰上了障碍,我能想到的有两种情况:

  1. 在上一步中,agent和障碍已经靠在一起,而在采取行动后,障碍和agent的位置互换了。这种情况就是发生了正碰,迎头装上。
  2. 在上一步中,agent和障碍没有靠一起,但是它们在这一步过后,位置重叠了。这种情况就是发生了90°的碰撞,比如一个从下面来,一个从左边来,跑到了同一个地方。

所以我们需要记录障碍在行动前和行动后的两个状态,在环境的step(self,action):中则有:

self.temp = dict()self.temp[self.DynamicObs1] = 1self.temp[self.DynamicObs2] = 1# Update terminate statesself.TerminateStates.pop(self.DynamicObs1)self.TerminateStates.pop(self.DynamicObs2)if self.Obs1Dir == 0:    if self.DynamicObs1 == 1:        self.Obs1Dir = 1        self.DynamicObs1  = 5    else:        self.DynamicObs1 -= 5else:    if self.DynamicObs1 == 21:        self.DynamicObs1 -= 5        self.Obs1Dir = 0    else:        self.DynamicObs1  = 5if self.Obs2Dir == 2:    if self.DynamicObs2 == 12:        self.DynamicObs2  = 1        self.Obs2Dir = 3    else:        self.DynamicObs2 -= 1else:    if self.DynamicObs2 == 14:        self.Obs2Dir = 2        self.DynamicObs2 -= 1    else:        self.DynamicObs2  = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1# Update rewards dictionaryself.Rewards = dict()if self.DynamicObs1 == 21:    self.Rewards[str(self.DynamicObs1 - 5)   '_1'] = -200.0    self.Rewards[str(self.DynamicObs1 - 1)   '_3'] = -200.0    self.Rewards[str(self.DynamicObs1   1)   '_2'] = -200.0elif self.DynamicObs1 == 1:    self.Rewards[str(self.DynamicObs1   5)   '_0'] = -200.0    self.Rewards[str(self.DynamicObs1 - 1)   '_3'] = -200.0    self.Rewards[str(self.DynamicObs1   1)   '_2'] = -200.0else:    self.Rewards[str(self.DynamicObs1 - 5)   '_1'] = -200.0    self.Rewards[str(self.DynamicObs1 - 1)   '_3'] = -200.0    self.Rewards[str(self.DynamicObs1   1)   '_2'] = -200.0    self.Rewards[str(self.DynamicObs1   5)   '_0'] = -200.0if self.DynamicObs2 == 14:    self.Rewards[str(self.DynamicObs2 - 5)   '_1'] = -200.0    self.Rewards[str(self.DynamicObs2 - 1)   '_3'] = -200.0    self.Rewards[str(self.DynamicObs2   5)   '_0'] = -200.0else:    self.Rewards[str(self.DynamicObs2 - 5)   '_1'] = -200.0    self.Rewards[str(self.DynamicObs2 - 1)   '_3'] = -200.0    self.Rewards[str(self.DynamicObs2   1)   '_2'] = -200.0    self.Rewards[str(self.DynamicObs2   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs1   1)   '_2'] = -200.0self.Rewards[str(self.StaticObs1   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1)   '_3'] = -200.0self.Rewards[str(self.StaticObs2   5)   '_0'] = -200.0self.Rewards[str(self.Terminal - 5)   '_1'] = 100.0self.Rewards[str(self.Terminal - 1)   '_3'] = 100.0

至此,我们做好了一步之下,障碍的运动及记录和reward的更新。

接下来,我们要根据传入的action,来判断agent采取该action所得的reward:判断是否撞墙,判断是否撞障碍,判断是否接近了目标等。

state = self.statekey = "%d_%d"%(state,action)# Dectect whether this action will lead to crashing into the wallif key in self.T:    next_state = self.T[key]else:    next_state = state    r = -200.0    is_Done = True    return next_state, r, is_Done, {}# Dectect whether this action will lead to crashing into the obstaclesself.state = next_state     # Update stateis_Done = Falseif next_state in self.TerminateStates or (next_state in self.temp and state in self.TerminateStates):    is_Done = Trueif key not in self.Rewards:    if (self.Terminal - next_state) < (self.Terminal - state):        r = 20.0    else:        r = -50.0else:    r = self.Rewards[key]return next_state, r, is_Done, {}

至此,step函数完成。

在这里我想多说一个我这样的小白容易犯的错误——一开始的时候,我几乎只给出惩罚reward没有给出奖励reward,这导致agent在尝试不多次都看不见终点的奖励以后,直接走进自闭,宁愿开局撞墙,拿个-200分,也不愿意再去尝试别的路子,因为我给它设定的世界里全是惩罚,只有看不见的终点有奖励。agent: 捏马,机生无望啊555 agent拿最高分的办法就是开局自鲨

环境变化设置reset(self):

reset函数也很重要,但是没啥好说的,就是充值一下各单位位置以及reward字典,直接在__init__(self):里复制粘贴就好

# Reset states and directionsself.Terminal = 24self.state = 0self.StaticObs1 = 10self.StaticObs2 = 19self.DynamicObs1 = 21self.DynamicObs2 = 14self.actions = [0, 1, 2, 3]self.Obs1Dir = 0self.Obs2Dir = 2self.gamma = 0.8# self.viewer = None# Reset episodes stopping criterionself.TerminateStates = dict()self.TerminateStates[self.StaticObs1] = 1self.TerminateStates[self.StaticObs2] = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1self.TerminateStates[self.Terminal] = 1# Reset rewards dictionaryself.Rewards = dict()self.Rewards[str(self.DynamicObs1 - 5)   '_1'] = -200.0self.Rewards[str(self.DynamicObs1 - 1)   '_3'] = -200.0self.Rewards[str(self.DynamicObs1   1)   '_2'] = -200.0self.Rewards[str(self.DynamicObs2 - 5)   '_1'] = -200.0self.Rewards[str(self.DynamicObs2 - 1)   '_3'] = -200.0self.Rewards[str(self.DynamicObs2   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs1   1)   '_2'] = -200.0self.Rewards[str(self.StaticObs1   5)   '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5)   '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1)   '_3'] = -200.0self.Rewards[str(self.StaticObs2   5)   '_0'] = -200.0self.Rewards[str(self.Terminal - 5)   '_1'] = 100.0self.Rewards[str(self.Terminal - 1)   '_3'] = 100.0return self

环境渲染设置render(self, mode = ‘human’):

这里应该没啥好说的,就是单纯的画图,认真看一下都能明白咋整的

from gym.envs.classic_control import renderingscreen_width = 700screen_height = 700if self.viewer is None:    self.viewer = rendering.Viewer(screen_width,screen_height)    # Plot the GridWorld    self.line1 = rendering.Line((100,100),(600,100))    self.line2 = rendering.Line((100, 200), (600, 200))    self.line3 = rendering.Line((100, 300), (600, 300))    self.line4 = rendering.Line((100, 400), (600, 400))    self.line5 = rendering.Line((100, 500), (600, 500))    self.line6 = rendering.Line((100, 600), (600, 600))    self.line7 = rendering.Line((100, 100), (100, 600))    self.line8 = rendering.Line((200, 100), (200, 600))    self.line9 = rendering.Line((300, 100), (300, 600))    self.line10 = rendering.Line((400, 100), (400, 600))    self.line11 = rendering.Line((500, 100), (500, 600))    self.line12 = rendering.Line((600, 100), (600, 600))    # Plot dynamic obstacle_1    self.obs1 = rendering.make_circle(40)    self.obs1trans = rendering.Transform()    # translation=(250, 150)    self.obs1.add_attr(self.obs1trans)    self.obs1.set_color(1, 0, 0)    # Plot dynamic obstacle_2    self.obs2 = rendering.make_circle(40)    self.obs2trans = rendering.Transform()    self.obs2.add_attr(self.obs2trans)    self.obs2.set_color(1, 0, 0)    # Plot static obstacle_1    self.obstacle_1 = rendering.make_circle(40)    self.obstacle1trans = rendering.Transform()    self.obstacle_1.add_attr(self.obstacle1trans)    self.obstacle_1.set_color(0, 0, 0)    # Plot static obstacle_2    self.obstacle_2 = rendering.make_circle(40)    self.obstacle2trans = rendering.Transform()    self.obstacle_2.add_attr(self.obstacle2trans)    self.obstacle_2.set_color(0, 0, 0)    # Plot Terminal    self.terminal = rendering.make_circle(40)    self.circletrans = rendering.Transform(translation=(550, 150))    self.terminal.add_attr(self.circletrans)    self.terminal.set_color(0, 0, 1)    # Plot robot    self.robot= rendering.make_circle(30)    self.robotrans = rendering.Transform()    self.robot.add_attr(self.robotrans)    self.robot.set_color(0, 1, 0)    self.line1.set_color(0, 0, 0)    self.line2.set_color(0, 0, 0)    self.line3.set_color(0, 0, 0)    self.line4.set_color(0, 0, 0)    self.line5.set_color(0, 0, 0)    self.line6.set_color(0, 0, 0)    self.line7.set_color(0, 0, 0)    self.line8.set_color(0, 0, 0)    self.line9.set_color(0, 0, 0)    self.line10.set_color(0, 0, 0)    self.line11.set_color(0, 0, 0)    self.line12.set_color(0, 0, 0)    self.viewer.add_geom(self.line1)    self.viewer.add_geom(self.line2)    self.viewer.add_geom(self.line3)    self.viewer.add_geom(self.line4)    self.viewer.add_geom(self.line5)    self.viewer.add_geom(self.line6)    self.viewer.add_geom(self.line7)    self.viewer.add_geom(self.line8)    self.viewer.add_geom(self.line9)    self.viewer.add_geom(self.line10)    self.viewer.add_geom(self.line11)    self.viewer.add_geom(self.line12)    self.viewer.add_geom(self.obs1)    self.viewer.add_geom(self.obs2)    self.viewer.add_geom(self.obstacle_1)    self.viewer.add_geom(self.obstacle_2)    self.viewer.add_geom(self.terminal)    self.viewer.add_geom(self.robot)if self.state is None:    return Noneself.robotrans.set_translation(self.x[self.state], self.y[self.state])self.obs1trans.set_translation(self.x[self.DynamicObs1], self.y[self.DynamicObs1])self.obs2trans.set_translation(self.x[self.DynamicObs2], self.y[self.DynamicObs2])self.obstacle1trans.set_translation(self.x[self.StaticObs1], self.y[self.StaticObs1])self.obstacle2trans.set_translation(self.x[self.StaticObs2], self.y[self.StaticObs2])return self.viewer.render(return_rgb_array=mode == 'rgb_array')

环境关闭close(self):

if self.viewer:    self.viewer.close()    self.viewer = None

至此,训练环境相关的设置已经彻底完成!

接下来,训练过程的代码和大佬们的大同小异。

基于Q-learning和Epsilon-greedy训练

值得一提的地方是:在训练过程中,判断本次训练结束后,我们需要在大佬的代码里再添加一个判断:如果不是因为到达终点而终止的训练的话,直接给一个-200.0的reward并更新q_table防止卡在一种死法上 。而更新q_table的步骤在action决策函数里面,故在if Done:下添加:

if observation != 24:    action, state = get_action(state, action, observation, -200.0, episode, 0.5)

以及删除下面一句我也没看懂的episode_reward = -100
其它的和大佬的代码基本一样啦!

来源:https://www.icode9.com/content-1-800751.html

(0)

相关推荐