基于强化学习动态避障的Python实现(绝赞摸鱼版)
基于强化学习动态避障的Python实现
吐槽在前
这是我的研究生小课题,可是老师从头到尾没有理过我,只给了我一个题目,连稍微具体一点的要求都没提。那我就摸鱼摸爆
于是我进行了许多的简化,到最后做出了一个网格世界(GridWorld)的环境模型,在5*5的网格世界中,用一个格子表示我们的Agent,四个格子表示障碍(两个动态障碍两个静态障碍),一个格子表示目的地。(想法来自于Matlab的强化学习工具箱)
参考文章
主要参考这位大佬。
TA整合了另外两位大佬的文章:
https://blog.csdn.net/extremebingo/article/details/80867486
https://blog.csdn.net/gg_18826075157/article/details/78163386
环境
VS Code下:
Python 3.8
gym 忘记是多少,但是无所谓啦只要不是远古版本肯定没问题
训练环境设置
在大佬的基础上,将世界大小扩充至5*5,并加入两个会来回移动的障碍物,大致如图:
上图是初始状态下的环境,我们的agent在左上角出发,目的地设置为右下角,黑色的是静态障碍,它们不会随着时间移动,红色的是动态障碍,它们的移动速度和agent一样。我们把左边的动态障碍称为动态障碍1,右边的动态障碍称为动态障碍2。动态障碍1会沿着当前列上下来回移动,动态障碍2则只在当前位置到左边两格的范围来回移动,如图:
每个训练episode中,agent在每一个step都可以选择四个方向行进,撞墙了或者走的步数超过了20步则终止该次训练。
环境代码__init__(self):
用一个1×25的list来表示5×5的矩阵,从左上到右下分别表示为0~24(states),如此一来,我们的agent从0出发,目的地是24,静态障碍的位置分别为10和19,动态障碍1起始位置为21,动态障碍2起始位置为14。在该环境下,上走为-5,下为 5,左为-1,右为 1(actions)。在环境的class的__init__(self):下则有:
#------------------Initial states and directions--------------#self.Terminal = 24 # 5*5 space represented by 1*25 matrix: 0 ~ 24self.state = 0self.StaticObs1 = 10self.StaticObs2 = 19self.DynamicObs1 = 21self.DynamicObs2 = 14self.actions = [0, 1, 2, 3] # Direction: Up_0, Down_1, Left_2, Right_3self.Obs1Dir = 0self.Obs2Dir = 2self.gamma = 0.8self.viewer = None#-------------------------------------------------------------##------------------Transformation Dictionary------------------#self.size = 5self.T = dict()for i in range(self.size, self.size * self.size):self.T[str(i) '_0'] = i - 5 # States that can go upfor i in range(self.size * (self.size - 1)): self.T[str(i) '_1'] = i 5 # States that can go downfor i in range(1, self.size * self.size): if i % self.size == 0: continue self.T[str(i) '_2'] = i - 1 # States that can go left for i in range(self.size * self.size): if (i 1) % self.size == 0: continue self.T[str(i) '_3'] = i 1 # States that can go right#-------------------------------------------------------------#
在这里,我觉得有一个很精妙的地方,大佬们把状态states和行为actions用拼接的方法拼到了一起,组合成了state_action这样的key,以调用行动结果以及查找该state下做这个action的奖励值(reward):
#--------------------------Rewards----------------------------#self.Rewards = dict() # Keys are combined with 'states' and '_actions'self.Rewards[str(self.DynamicObs1 - 5) '_1'] = -200.0self.Rewards[str(self.DynamicObs1 - 1) '_3'] = -200.0self.Rewards[str(self.DynamicObs1 1) '_2'] = -200.0self.Rewards[str(self.DynamicObs2 - 5) '_1'] = -200.0self.Rewards[str(self.DynamicObs2 - 1) '_3'] = -200.0self.Rewards[str(self.DynamicObs2 5) '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs1 1) '_2'] = -200.0self.Rewards[str(self.StaticObs1 5) '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1) '_3'] = -200.0self.Rewards[str(self.StaticObs2 5) '_0'] = -200.0self.Rewards[str(self.Terminal - 5) '_1'] = 100.0self.Rewards[str(self.Terminal - 1) '_3'] = 100.0#-------------------------------------------------------------#
撞到障碍物或目的地则该次训练结束:
#-----------------Episodes stopping criterion-----------------#self.TerminateStates = dict() # When crash into obstacles or arrive terminalself.TerminateStates[self.StaticObs1] = 1self.TerminateStates[self.StaticObs2] = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1self.TerminateStates[self.Terminal] = 1#-------------------------------------------------------------#
为了让结果可视化,我们需要自己渲染结果,比如我打算设置一个700×700的窗口,那么,每一格的中心的横坐标为[150, 250, 350, 450, 550]重复5次(因为是一个1×25的list,每5个为环境的一行),相应地,纵坐标为150,250,350,450,550分别重复5次。
#------------------Coordinates for ploting--------------------#self.x = [150,250,350,450,550] * 5self.y = [550] * 5 [450] * 5 [350] * 5 [250] * 5 [150] * 5#-------------------------------------------------------------#
以上,是环境初始化的__init__(self)函数的主要内容
接下来,是强化学习的关键,step函数和reset函数。
环境变化设置step(self, action):
step函数是会在主函数里疯狂调用的函数,它揭示了该环境下的一切变化。即在一个训练episode里,每进行一步,agent采取行动导致的变化,障碍如何移动以及障碍移动导致reward字典的变化都应该包含在step函数里。具体的思路是:在主函数里决定采取的action,然后传入step,在step里进行了障碍的移动和reward的更新后,判断该次行动带来的reward,以及此次训练是否需要结束等。故此函数需要返回的东西有:agent采取行动后的状态,奖励值,是否结束此次训练的flag。
首先,我们要判断agent是否碰上了障碍,我能想到的有两种情况:
- 在上一步中,agent和障碍已经靠在一起,而在采取行动后,障碍和agent的位置互换了。这种情况就是发生了正碰,迎头装上。
- 在上一步中,agent和障碍没有靠一起,但是它们在这一步过后,位置重叠了。这种情况就是发生了90°的碰撞,比如一个从下面来,一个从左边来,跑到了同一个地方。
所以我们需要记录障碍在行动前和行动后的两个状态,在环境的step(self,action):中则有:
self.temp = dict()self.temp[self.DynamicObs1] = 1self.temp[self.DynamicObs2] = 1# Update terminate statesself.TerminateStates.pop(self.DynamicObs1)self.TerminateStates.pop(self.DynamicObs2)if self.Obs1Dir == 0: if self.DynamicObs1 == 1: self.Obs1Dir = 1 self.DynamicObs1 = 5 else: self.DynamicObs1 -= 5else: if self.DynamicObs1 == 21: self.DynamicObs1 -= 5 self.Obs1Dir = 0 else: self.DynamicObs1 = 5if self.Obs2Dir == 2: if self.DynamicObs2 == 12: self.DynamicObs2 = 1 self.Obs2Dir = 3 else: self.DynamicObs2 -= 1else: if self.DynamicObs2 == 14: self.Obs2Dir = 2 self.DynamicObs2 -= 1 else: self.DynamicObs2 = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1# Update rewards dictionaryself.Rewards = dict()if self.DynamicObs1 == 21: self.Rewards[str(self.DynamicObs1 - 5) '_1'] = -200.0 self.Rewards[str(self.DynamicObs1 - 1) '_3'] = -200.0 self.Rewards[str(self.DynamicObs1 1) '_2'] = -200.0elif self.DynamicObs1 == 1: self.Rewards[str(self.DynamicObs1 5) '_0'] = -200.0 self.Rewards[str(self.DynamicObs1 - 1) '_3'] = -200.0 self.Rewards[str(self.DynamicObs1 1) '_2'] = -200.0else: self.Rewards[str(self.DynamicObs1 - 5) '_1'] = -200.0 self.Rewards[str(self.DynamicObs1 - 1) '_3'] = -200.0 self.Rewards[str(self.DynamicObs1 1) '_2'] = -200.0 self.Rewards[str(self.DynamicObs1 5) '_0'] = -200.0if self.DynamicObs2 == 14: self.Rewards[str(self.DynamicObs2 - 5) '_1'] = -200.0 self.Rewards[str(self.DynamicObs2 - 1) '_3'] = -200.0 self.Rewards[str(self.DynamicObs2 5) '_0'] = -200.0else: self.Rewards[str(self.DynamicObs2 - 5) '_1'] = -200.0 self.Rewards[str(self.DynamicObs2 - 1) '_3'] = -200.0 self.Rewards[str(self.DynamicObs2 1) '_2'] = -200.0 self.Rewards[str(self.DynamicObs2 5) '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs1 1) '_2'] = -200.0self.Rewards[str(self.StaticObs1 5) '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1) '_3'] = -200.0self.Rewards[str(self.StaticObs2 5) '_0'] = -200.0self.Rewards[str(self.Terminal - 5) '_1'] = 100.0self.Rewards[str(self.Terminal - 1) '_3'] = 100.0
至此,我们做好了一步之下,障碍的运动及记录和reward的更新。
接下来,我们要根据传入的action,来判断agent采取该action所得的reward:判断是否撞墙,判断是否撞障碍,判断是否接近了目标等。
state = self.statekey = "%d_%d"%(state,action)# Dectect whether this action will lead to crashing into the wallif key in self.T: next_state = self.T[key]else: next_state = state r = -200.0 is_Done = True return next_state, r, is_Done, {}# Dectect whether this action will lead to crashing into the obstaclesself.state = next_state # Update stateis_Done = Falseif next_state in self.TerminateStates or (next_state in self.temp and state in self.TerminateStates): is_Done = Trueif key not in self.Rewards: if (self.Terminal - next_state) < (self.Terminal - state): r = 20.0 else: r = -50.0else: r = self.Rewards[key]return next_state, r, is_Done, {}
至此,step函数完成。
在这里我想多说一个我这样的小白容易犯的错误——一开始的时候,我几乎只给出惩罚reward没有给出奖励reward,这导致agent在尝试不多次都看不见终点的奖励以后,直接走进自闭,宁愿开局撞墙,拿个-200分,也不愿意再去尝试别的路子,因为我给它设定的世界里全是惩罚,只有看不见的终点有奖励。agent: 捏马,机生无望啊555 agent拿最高分的办法就是开局自鲨
环境变化设置reset(self):
reset函数也很重要,但是没啥好说的,就是充值一下各单位位置以及reward字典,直接在__init__(self):里复制粘贴就好
# Reset states and directionsself.Terminal = 24self.state = 0self.StaticObs1 = 10self.StaticObs2 = 19self.DynamicObs1 = 21self.DynamicObs2 = 14self.actions = [0, 1, 2, 3]self.Obs1Dir = 0self.Obs2Dir = 2self.gamma = 0.8# self.viewer = None# Reset episodes stopping criterionself.TerminateStates = dict()self.TerminateStates[self.StaticObs1] = 1self.TerminateStates[self.StaticObs2] = 1self.TerminateStates[self.DynamicObs1] = 1self.TerminateStates[self.DynamicObs2] = 1self.TerminateStates[self.Terminal] = 1# Reset rewards dictionaryself.Rewards = dict()self.Rewards[str(self.DynamicObs1 - 5) '_1'] = -200.0self.Rewards[str(self.DynamicObs1 - 1) '_3'] = -200.0self.Rewards[str(self.DynamicObs1 1) '_2'] = -200.0self.Rewards[str(self.DynamicObs2 - 5) '_1'] = -200.0self.Rewards[str(self.DynamicObs2 - 1) '_3'] = -200.0self.Rewards[str(self.DynamicObs2 5) '_0'] = -200.0self.Rewards[str(self.StaticObs1 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs1 1) '_2'] = -200.0self.Rewards[str(self.StaticObs1 5) '_0'] = -200.0self.Rewards[str(self.StaticObs2 - 5) '_1'] = -200.0self.Rewards[str(self.StaticObs2 - 1) '_3'] = -200.0self.Rewards[str(self.StaticObs2 5) '_0'] = -200.0self.Rewards[str(self.Terminal - 5) '_1'] = 100.0self.Rewards[str(self.Terminal - 1) '_3'] = 100.0return self
环境渲染设置render(self, mode = ‘human’):
这里应该没啥好说的,就是单纯的画图,认真看一下都能明白咋整的
from gym.envs.classic_control import renderingscreen_width = 700screen_height = 700if self.viewer is None: self.viewer = rendering.Viewer(screen_width,screen_height) # Plot the GridWorld self.line1 = rendering.Line((100,100),(600,100)) self.line2 = rendering.Line((100, 200), (600, 200)) self.line3 = rendering.Line((100, 300), (600, 300)) self.line4 = rendering.Line((100, 400), (600, 400)) self.line5 = rendering.Line((100, 500), (600, 500)) self.line6 = rendering.Line((100, 600), (600, 600)) self.line7 = rendering.Line((100, 100), (100, 600)) self.line8 = rendering.Line((200, 100), (200, 600)) self.line9 = rendering.Line((300, 100), (300, 600)) self.line10 = rendering.Line((400, 100), (400, 600)) self.line11 = rendering.Line((500, 100), (500, 600)) self.line12 = rendering.Line((600, 100), (600, 600)) # Plot dynamic obstacle_1 self.obs1 = rendering.make_circle(40) self.obs1trans = rendering.Transform() # translation=(250, 150) self.obs1.add_attr(self.obs1trans) self.obs1.set_color(1, 0, 0) # Plot dynamic obstacle_2 self.obs2 = rendering.make_circle(40) self.obs2trans = rendering.Transform() self.obs2.add_attr(self.obs2trans) self.obs2.set_color(1, 0, 0) # Plot static obstacle_1 self.obstacle_1 = rendering.make_circle(40) self.obstacle1trans = rendering.Transform() self.obstacle_1.add_attr(self.obstacle1trans) self.obstacle_1.set_color(0, 0, 0) # Plot static obstacle_2 self.obstacle_2 = rendering.make_circle(40) self.obstacle2trans = rendering.Transform() self.obstacle_2.add_attr(self.obstacle2trans) self.obstacle_2.set_color(0, 0, 0) # Plot Terminal self.terminal = rendering.make_circle(40) self.circletrans = rendering.Transform(translation=(550, 150)) self.terminal.add_attr(self.circletrans) self.terminal.set_color(0, 0, 1) # Plot robot self.robot= rendering.make_circle(30) self.robotrans = rendering.Transform() self.robot.add_attr(self.robotrans) self.robot.set_color(0, 1, 0) self.line1.set_color(0, 0, 0) self.line2.set_color(0, 0, 0) self.line3.set_color(0, 0, 0) self.line4.set_color(0, 0, 0) self.line5.set_color(0, 0, 0) self.line6.set_color(0, 0, 0) self.line7.set_color(0, 0, 0) self.line8.set_color(0, 0, 0) self.line9.set_color(0, 0, 0) self.line10.set_color(0, 0, 0) self.line11.set_color(0, 0, 0) self.line12.set_color(0, 0, 0) self.viewer.add_geom(self.line1) self.viewer.add_geom(self.line2) self.viewer.add_geom(self.line3) self.viewer.add_geom(self.line4) self.viewer.add_geom(self.line5) self.viewer.add_geom(self.line6) self.viewer.add_geom(self.line7) self.viewer.add_geom(self.line8) self.viewer.add_geom(self.line9) self.viewer.add_geom(self.line10) self.viewer.add_geom(self.line11) self.viewer.add_geom(self.line12) self.viewer.add_geom(self.obs1) self.viewer.add_geom(self.obs2) self.viewer.add_geom(self.obstacle_1) self.viewer.add_geom(self.obstacle_2) self.viewer.add_geom(self.terminal) self.viewer.add_geom(self.robot)if self.state is None: return Noneself.robotrans.set_translation(self.x[self.state], self.y[self.state])self.obs1trans.set_translation(self.x[self.DynamicObs1], self.y[self.DynamicObs1])self.obs2trans.set_translation(self.x[self.DynamicObs2], self.y[self.DynamicObs2])self.obstacle1trans.set_translation(self.x[self.StaticObs1], self.y[self.StaticObs1])self.obstacle2trans.set_translation(self.x[self.StaticObs2], self.y[self.StaticObs2])return self.viewer.render(return_rgb_array=mode == 'rgb_array')
环境关闭close(self):
if self.viewer: self.viewer.close() self.viewer = None
至此,训练环境相关的设置已经彻底完成!
接下来,训练过程的代码和大佬们的大同小异。
基于Q-learning和Epsilon-greedy训练
值得一提的地方是:在训练过程中,判断本次训练结束后,我们需要在大佬的代码里再添加一个判断:如果不是因为到达终点而终止的训练的话,直接给一个-200.0的reward并更新q_table防止卡在一种死法上 。而更新q_table的步骤在action决策函数里面,故在if Done:下添加:
if observation != 24: action, state = get_action(state, action, observation, -200.0, episode, 0.5)
以及删除下面一句我也没看懂的episode_reward = -100
其它的和大佬的代码基本一样啦!