(1条消息) python多线程
Python多线程多进程
文章目录
- 并行和并发的概念
- 线程和进程的概念(来点八股文)
- PythonGIL锁相关以及历史
- 多线程编程详解
- 多进程编程详解(重点)
一、什么是并行和并发?
首先我们来先说一下一个简单的共同点,并行和并发都是完成多任务更加有效率的工具。我们下面用一张图来说明它们的不同点
- 并发:是指应用能够交替执行不同的任务,其实并发有带你类似于多线程的原理,多线程并非同事执行多个任务,如果你开多个线程执行,就是在你几乎不可察觉的速度不断的去切换这几个线程,以达到“同时执行的效果”
- 并行:时指应用能够“同时”执行不同的的任务,就像你吃饭的时候可以一边吃饭一边打电话
- 总结来说就是,并发时交替执行,并行时同时执行
线程和进程的概念
在很多教科书上都有一句话:进程时资源分配的最小单位,线程时CPU调度的最小单位。
线程是程序中一个单一的顺序控制流程,是进程内一个相对独立的、可调度的执行单元,是系统独立调度和分配CPU的基本单位中的调度单位。
进程和线程的区别
进程是资源分配的基本单位。所有与该进程有关的资源,都被记录在进程控制块PCB中。以表示该进程拥有这些资源。另外进程也是抢占处理机的调度单位,它拥有一个完整的虚拟地址空间。但进程发生调度的时候,不同的进程拥有不同的虚拟你地址空间,而同一进程内的不同线程共享同一地址空间。
与进程相对应,线程和资源分配无关,它属于某一进程,并与进程内的其他线程一起共享进程的资源。线程只由相关堆栈寄存器和线程控制表TCB组成。
通常在一个进程中可以包含若干个线程,他们可以利用进程所拥有的资源。在引入线程的操作系统中,通常都是把进程作为资源费配的基本单位,而把线程作为独立运行和独立调度的基本单位。由于线程比进程更小。进本上不拥有系统资源,故对它的调度所付出的开销就会小得多,能更搞笑得提高系统内多个程序间得并发执行程度,提高系统资源的利用率。
我们总结下来有以下四项:
- 根本的区别为:进程是操作系统资源分配的基本单位,线程是任务调度和执行的基本单位
- 开销方面:每个进程都有独立的代码和数据空间,程序之间的切换会有较大的开销。线程可以看作轻量级进程,同一类线程共享代码和数据空间,每一个线程都有自己独立的运行栈,线程之间的切换开销小。
- 环境:在操作系统中能同时运行多个进程;在同一个进程中有多个线程同时执行。
- 内存分配方面:系统在运行的时候会为每个进程分配不同的空间;而对线程而言,除CPU外,系统不会为线程分配内存
- 关系:一个进程拥有多个线程,而一个线程只属于一个进程
多进程和多线程比较
对比维度 | 多进程 | 多线程 | 总结 |
---|---|---|---|
数据共享、同步 | 数据共享复杂,同步简单 | 数据共享简单,同步复杂 | 各有优劣 |
内存、CPU | 占用内存多,切换复杂,CPU利用率低 | 占用内存少,切换简单,CPU利用率高 | 线程占优 |
创建、销毁、切换 | 复杂,速度慢 | 简单,速度快 | 线程占优 |
编程、调试 | 编程简单,调试简单 | 编程复杂,调试复杂 | 进程占优 |
可靠性 | 进程间不会互相影响 | 一个线程挂掉将导致整个进程挂掉 | 进程占优 |
GIL锁(全局解释器锁)
每次谈到Python的多线程的时候,我们都会说到GIL锁的问题。
在正式的讲GIL锁之前,我们有一点是需要明确的,就是GIL并不是Python的特性,他是在实现Python解析器(CPython)时引入的概念。而在别的解析器中比如,Jython,IronPython等都是没有全局解释锁的,但CPython时大部分环境下的默认Python执行环境,所以在很多人的概念中CPython就是Python,但这个想法是错的。
GIL本质就是一把互斥锁,既然是互斥锁,所有的互斥锁的本质都是一样的,都是会将并发运行变成串行,以此来控制同一时间内数据只能被一个任务修改,进而保证数据安全。保护不同的数据的安全,就应该加不同的锁。
GIL的版本也在不断的改善之中,在python3.2前,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过 sys.setcheckinterval 来调整),进行释放。因为计算密集型线程在释放GIL之后又会立即去申请GIL,并且通常在其它线程还没有调度完之前它就已经重新获取到了GIL,就会导致一旦计算密集型线程获得了GIL,那么它在很长一段时间内都将占据GIL,甚至一直到该线程执行结束。
而在Python3.2之后开始使用新的GIL。新的GIL实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在5毫秒后被强制释放该锁。该改进在单核的情况下,对于单个线程长期占用GIL的情况有所好转。
总结以下:
有了GIL的存在,Python有这两个特点:
- 进程可以利用多核,但是开销大。
- 多线程开销小,却无法利用多核优势
也就是说,在Python中的多线程是假的多线程,Python解释器虽然可以开启多个线程,但在同一时间只有一个线程在解释器中运行,而做到这一点的正式由于GIL锁的存在,它的存在使得CPU的资源统一时间只会给一个线程使用,而由于开启线程的开销小,所以多线程才能有一篇用武之地。
但Python的多线程还是由用处的,从上面的分析我们可以知道:
- 如果是I/O密集型任务,再多核也没用,即能开再多进程也没用,所以我们利用Python的多线程。
- 如果是计算密集型任务,多进程则更好了。
多线程编程
在Python的多线程编程中,我们主要的是借助于threading模块,而在threading模块中最核心的内容是Thread这个类。我们要创建Thread对象,然后让他们运行,每一个Thread对象代表一个线程,在每一个线程中我们可以让程序处理不同的任务。这就是多线程编程。
创建Thread对象,有两种手段。
- 直接创建 Thread ,将一个 callable 对象从类的构造器传递进去,这个 callable 就是回调函数,用来处理任务。
- 编写一个自定义类继承 Thread,然后复写 run() 方法,在 run() 方法中编写任务处理代码,然后创建这个 Thread 的子类。(类似Java)
我们主要介绍第一种方式
class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)
- group 应该为
None
;为了日后扩展ThreadGroup
类实现而保留。 - target 是用于
run()
方法调用的可调用对象。默认是None
,表示不需要调用任何方法。 - name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。
- args 是用于调用目标函数的参数元组。默认是
()
。 - kwargs 是用于调用目标函数的关键字参数字典。默认是
{}
。 - daemon 参数将显式地设置该线程是否为守护模式。如果是
None
(默认值),线程将继承当前线程的守护模式属性。
在Thread中有以下方法
start
()开始线程活动。run
()代表线程活动的方法。join
(timeout=None)等待,直到线程终结。这会阻塞调用这个方法的线程,直到被调用join()
的线程终结 – 不管是正常终结还是抛出未处理异常 – 或者直到发生超时,超时选项是可选的。- name只用于识别的字符串。它没有语义。多个线程可以赋予相同的名称。初始名称由构造函数设置。
is_alive
()返回线程是否存活。- daemon一个表示这个线程是(True)否(False)守护线程的布尔值。
我们列一下简单的代码实例:
import threadingimport time
def test():
for i in range(5): print(threading.current_thread().name+' test ',i) # 获取子线程的名字 time.sleep(0.5)
thread = threading.Thread(target=test,name='TestThread') # 实例化线程thread.start() # 开始线程thread.join() # 添加阻塞,可以改变线程的运行顺序
for i in range(5): print(threading.current_thread().name+' main ', i) print(thread.name+' is alive ', thread.isAlive()) # 判断线程是狗准确 time.sleep(1)
结果
TestThread test 0TestThread test 1TestThread test 2TestThread test 3TestThread test 4MainThread main 0TestThread is alive FalseMainThread main 1TestThread is alive FalseMainThread main 2TestThread is alive FalseMainThread main 3TestThread is alive FalseMainThread main 4TestThread is alive False
如果我门想要主进程结束的时候,子进程也要跟着结果,要怎么做呢,这里就可以用到daemon了。
我们可以设置主线程的执行时间比子线程慢,来检测一下
#-*-coding:utf-8-*-import threadingimport time
def test():
for i in range(10): print(threading.current_thread().name+' test ',i) time.sleep(1)
thread = threading.Thread(target=test,name='TestThread', daemon=True) # 设置守护线程thread.start()
for i in range(10): print(threading.current_thread().name+' main ', i) print(thread.name+' is alive ', thread.isAlive()) time.sleep(0.5)
结果:
MainThread main 0TestThread is alive TrueTestThread test 0MainThread main 1TestThread is alive TrueTestThread test 1MainThread main 2TestThread is alive TrueMainThread main 3TestThread is alive TrueTestThread test 2MainThread main 4TestThread is alive TrueMainThread main 5TestThread is alive TrueTestThread test 3MainThread main 6TestThread is alive TrueMainThread main 7TestThread is alive TrueTestThread test 4MainThread main 8TestThread is alive TrueMainThread main 9TestThread is alive True
我们简单的来看一下第二种实现方式(继承Thread类):
import threadingimport time
class TestThread(threading.Thread):
def __init__(self,name=None): threading.Thread.__init__(self,name=name)
def run(self): for i in range(5): print(threading.current_thread().name + ' test ', i) time.sleep(1) ## 重写run方法
thread = TestThread(name='TestThread')thread.start()
for i in range(5): print(threading.current_thread().name+' main ', i) print(thread.name+' is alive ', thread.isAlive()) time.sleep(1)
介绍完基础部分就要来讲一个比较困难的一部分了,就是多线程中的锁的机制,如何实现多个线程之间的通信和数据同步。
我们用一个经典的问题来解释这个问题:
卖票问题,买票有多个窗口,我们假设有三个,窗口之间共享一个票池,每个窗口都可以买票,直至票池里面没有票可以卖。
代码如下:
import threadingimport random
class WindowThread(threading.Thread):
def __init__(self,name): threading.Thread.__init__(self,name=name) self.name = name self.tickts = 0
def run(self): global tickt_count
while tickt_count > 0:
print('%s notice:There has %d tickts remain ' %(self.name,tickt_count))
if tickt_count > 2: number = random.randint(1,2) else: number = 1
tickt_count -= number self.tickts += number
print('%s have buy %d tickt,the remain tickt\'t count is %d .Already buy %d \n' % (self.name, number, tickt_count, self.tickts))
print('%s notice:There is no tickt can sold! Already sold %d'%(self.name,self.tickts))
tickt_count = 10
window1 = WindowThread('window1')window2 = WindowThread('window2')window3 = WindowThread('window3')
window1.start()window2.start()window3.start()window1.join()window2.join()window3.join()
print('tickt count ',tickt_count)
结果:
window1 notice:There has 10 tickts remain window1 have buy 1 tickt,the remain tickt't count is 9 .Already buy 1 window1 notice:There has 9 tickts remain window1 have buy 1 tickt,the remain tickt't count is 8 .Already buy 2
window2 notice:There has 8 tickts remain window2 have buy 2 tickt,the remain tickt't count is 6 .Already buy 2 window1 notice:There has 6 tickts remain window2 notice:There has 6 tickts remain window2 have buy 1 tickt,the remain tickt't count is 5 .Already buy 3
window1 have buy 2 tickt,the remain tickt't count is 3 .Already buy 4 window1 notice:There has 3 tickts remain window1 have buy 1 tickt,the remain tickt't count is 2 .Already buy 5 window2 notice:There has 2 tickts remain window2 have buy 1 tickt,the remain tickt't count is 1 .Already buy 4 window2 notice:There has 1 tickts remain window3 notice:There has 1 tickts remain window3 have buy 1 tickt,the remain tickt't count is 0 .Already buy 1
window1 notice:There is no tickt can sold! Already sold 5
window2 have buy 1 tickt,the remain tickt't count is -1 .Already buy 5 window3 notice:There is no tickt can sold! Already sold 1window2 notice:There is no tickt can sold! Already sold 5tickt count -1
我们惊奇的发现居然会有 -1出现,这就是数据共享出现了错误,如果我们加上锁:
如下:(自行添加测试噢)
self.lock.acquire()if tickt_count > 0: if tickt_count > 2: number = random.randint(1,2) else: number = 1 tickt_count -= number self.tickts += number
print('%s have buy %d tickt,the remain tickt\'t count is %d .Already buy %d \n' % (self.name, number, tickt_count, self.tickts))self.lock.release()
执行代码片段
lock = threading.Lock()
window1 = WindowThread('window1',lock)window2 = WindowThread('window2',lock)window3 = WindowThread('window3',lock)
使用锁后就不会出现上面的问题了。
lock中有两个方法
- acquire()
- release()
- 两者在单个线程当中都是成对使用的。
多进程编程
由于GIL锁的存在,所以如果我们为了使用多核的优势,我们通常会使用多进程编程。在python多进程编程之中,我们常用的是multiprocessing包。
创建管理进程模块:
- Process(用于创建进程)
- Pool(用于创建管理进程池)
- Queue(用于进程通信,资源共享)
- Value,Array(用于进程通信,资源共享)
- Pipe(用于管道通信)
- Manager(用于资源共享)
我们先来了解一下Process吧:
class multiprocessing.``Process
(group=None, target=None, name=None, args=(), kwargs={}, ***, daemon=None)
其实Process和Thread很相像,两者的使用也是非常的类似。
- group:分组,实际上不使用,值始终为None
- target:表示调用对象,即子进程要执行的任务,你可以传入方法名
- name:为子进程设定名称
- args:要传给target函数的位置参数,以元组方式进行传入。
- kwargs:要传给target函数的字典参数,以字典方式进行传入。
实例方法:
- start():启动进程,并调用该子进程中的p.run()
- run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
- terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
- is_alive():返回进程是否在运行。如果p仍然运行,返回True
- join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
实例:(Windows下的Process()必须放在 if _name_ == '_main_' )下运行
from multiprocessing import Processimport os
def run_proc(name): print('Run child process {}({})'.format(name, os.getpid()))
if __name__ == '__main__': print('Parent process {}'.format(os.getpid())) p = Process(target=run_proc, args=('test',)) print('Child process will start') p.start() p.join() print("Child process end") ----------------------------------------------------------------------------- Parent process 27808 Child process will start Run child process test(28372) Child process end
Pool(用于创建管理进程池)
Pool类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用Process类。Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
class multiprocessing.pool.``Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
- processes :要创建的进程数,如果省略,将默认使用cpu_count()返回的数量。
- initializer:每个工作进程启动时要执行的可调用对象,默认为None。如果initializer是None,那么每一个工作进程在开始的时候会调用initializer(*initargs)。
- initargs:是要传给initializer的参数组。
- maxtasksperchild:工作进程退出之前可以完成的任务数,完成后用一个新的工作进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工作进程就会一直存活。
- context: 用在制定工作进程启动时的上下文,一般使用Pool() 或者一个context对象的Pool()方法来创建一个池,两种方法都适当的设置了context。
实例方法:
- apply(func[, args[, kwargs]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()。它是阻塞的。apply很少使用
- apply_async(func[, arg[, kwds={}[, callback=None]]]):在一个池工作进程中执行func(args,*kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。它是非阻塞。
- map(func, iterable[, chunksize=None]):Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
- map_async(func, iterable[, chunksize=None]):map_async与map的关系同apply与apply_async
- imap():imap 与 map的区别是,map是当所有的进程都已经执行完了,并将结果返回了,imap()则是立即返回一个iterable可迭代对象。
- imap_unordered():不保证返回的结果顺序与进程添加的顺序一致。
- close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成。
- join():等待所有工作进程退出。此方法只能在close()或teminate()之后调用,让其不再接受新的Process。
- terminate():结束工作进程,不再处理未处理的任务。
from multiprocessing import Pool
def test(i): print(i)
if __name__ == '__main__': lists = range(100) pool = Pool(10) pool.map(test, lists) pool.close() pool.join()
# 异步进程池#-*-coding:utf-8-*-from multiprocessing import Pool
def test(i): print(i)
if __name__ == '__main__': pool = Pool(8) for i in range(100): ''' For循环中执行步骤: (1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞) (2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞) apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。 ''' pool.apply_async(test, args=(i,)) print("test") pool.close() pool.join()
Queue(用于进程通信,资源共享)
在使用多进程的过程中,最好不要使用共享资源。普通的全局变量是不能被子进程所共享的,只有通过Multiprocessing组件构造的数据结构可以被共享。
Queue是用来创建进程间资源共享的队列的类,使用Queue可以达到多进程间数据传递的功能(缺点:只适用Process类,不能在Pool进程池中使用)。
class multiprocessing.``Queue
([maxsize])
- maxsize是队列中允许最大项数,省略则无大小限制
实例方法:
- put():用以插入数据到队列。put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
- get():可以从队列读取并且删除一个元素。get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常。若不希望在empty的时候抛出异常,令blocked为True或者参数全部置空即可。
- get_nowait():同q.get(False)
- put_nowait():同q.put(False)
- empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
- full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
- qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
#-*-coding:utf-8-*-from multiprocessing import Process, Queueimport os, time, random
def write(q): print('Process to write {}'.format(os.getpid())) for value in ['A', 'B', 'C']: print('Put {} to queue'.format(value)) q.put(value) time.sleep(random.random())
def read(q): print('Process to read: {}'.format(os.getpid())) while True: value = q.get(True) print('Get {} from queue'.format(value))
if __name__ == '__main__': q = Queue() pw = Process(target=write, args=(q, )) pr = Process(target=read, args=(q, )) pw.start() pr.start() pw.join() # 等待pw借宿 pr.terminate() # pr进程是死循环,无法等待器借宿,只能强行终止
Pipe(用于管道通信)
多进程还有一种数据传递方式叫管道原理和 Queue相同。Pipe可以在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道。
multiprocessing.``Pipe
([duplex])
- dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。
实例方法:
- send(obj):通过连接发送对象。obj是与序列化兼容的任意对象
- recv():接收conn2.send(obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。
- close():关闭连接。如果conn1被垃圾回收,将自动调用此方法
- fileno():返回连接使用的整数文件描述符
- poll([timeout]):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。
- recv_bytes([maxlength]):接收c.send_bytes()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。
- send_bytes(buffer [, offset [, size]]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes()函数进行接收
- recv_bytes_into(buffer [, offset]):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
from multiprocessing import Process, Pipeimport time
def f(subconn): time.sleep(1) subconn.send("吃了吗?") print("来自父亲的问候:", subconn.recv()) subconn.close()
if __name__ == '__main__': parent_conn, child_conn = Pipe() # 创建管道两端 p = Process(target=f, args=(child_conn, )) # 创建子进程 p.start() print("来自儿子的问候:", parent_conn.recv()) parent_conn.send("嗯")
Lock(互斥锁)
Lock锁的作用是当多个进程需要访问共享资源的时候,避免访问的冲突。加锁保证了多个进程修改同一块数据时,同一时间只能有一个修改,即串行的修改,牺牲了速度但保证了数据安全。Lock包含两种状态——锁定和非锁定,以及两个基本的方法。
构造方法:Lock()
实例方法:
- acquire([timeout]): 使线程进入同步阻塞状态,尝试获得锁定。
- release(): 释放锁。使用前线程必须已获得锁定,否则将抛出异常。
from multiprocessing import Process, Lock
def l(lock, num): lock.acquire() print("Hello Num: %s" % (num)) lock.release()
if __name__ == '__main__': lock = Lock() # 这个一定要定义为全局 for num in range(20): Process(target=l, args=(lock, num)).start()
下期的内容将会介绍:协程和异步编程!
哈喽,我是海森堡,如果觉得文章对你有帮助,欢迎分享给你的朋友,也给我点个在看,这对我非常重要,给各位哥哥姐姐们抱拳,我们下次见