(1条消息) python多进程并发与pool多线程

一.多进程:

当计算机运行程序时,就会创建包含代码和状态的进程。这些进程会通过计算机的一个或多个CPU执行。不过,同一时刻每个CPU只会执行一个进程,然后不同进程间快速切换,给我们一种错觉,感觉好像多个程序在同时进行。例如:有一个大型工厂,该工厂负责生产电脑,工厂有很多的车间用来生产不同的电脑部件。每个车间又有很多工人互相合作共享资源来生产某个电脑部件。这里的工厂相当于一个爬虫工程,每个车间相当于一个进程每个工人就相当于线程线程是CPU调度的基本单元。

需要注意的是单核CPU系统中,真正的并发是不可能的.

1.顺序执行

2.多进程并发 注意除了时间的加速意外也要看看函数返回值的写法,带有多进程的map,是返回一个列表

  1. import requests
  2. import re
  3. import time
  4. from multiprocessing import Pool
  5. from multiprocessing.dummy import Pool as ThreadPool
  6. def spyder(url):
  7. # res = []
  8. res = {'init:':'hello'}
  9. print('hahah:{}'.format(url))
  10. time.sleep(1)
  11. # res.append(url)
  12. res.update({'entr:'+url:url})
  13. return res
  14. def use_process():
  15. urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]
  16. start_1 = time.time()
  17. #获取函数返回结果
  18. res1 = []
  19. for url in urls:
  20. res_ = spyder(url)
  21. res1.append(res_)
  22. end_1 = time.time()
  23. print("单进程:", end_1 - start_1)
  24. print('res1:', res1)
  25. # 获取函数返回结果
  26. #  进程池
  27. start_2 = time.time()
  28. pool = Pool(processes=2)
  29. res2 = pool.map(spyder, urls)
  30. pool.close()
  31. pool.join()
  32. print('res2:', res2)
  33. end_2 = time.time()
  34. print("2进程:", end_2 - start_2)
  35. # 获取函数返回结果
  36. # 进程池
  37. start_3 = time.time()
  38. pool = Pool(processes=4)
  39. res3 = pool.map(spyder, urls)
  40. pool.close()
  41. pool.join()
  42. print('res2:', res3)
  43. end_3 = time.time()
  44. print("4进程:", end_3 - start_3)
  45. if __name__ == "__main__":
  46. use_process()

2.多线程

2.1 thread多线程

  1. import time
  2. import _thread
  3. from threading import Thread
  4. # 使用线程锁,防止线程死锁
  5. mutex = _thread.allocate_lock()
  6. def test(d_num):
  7. d_num.append(89)
  8. print("test: %s"% str(d_num))
  9. def test1(d_num):
  10. print("test1: %s"% str(d_num))
  11. def main():
  12. d_num = [100, 58]
  13. t1 = Thread(target=test, args=(d_num,))
  14. t2 = Thread(target=test1, args=(d_num,))
  15. t1.start()
  16. time.sleep(1)
  17. t2.start()
  18. time.sleep(1)
  19. if __name__ == '__main__':
  20. main()

2.2 多线程队列版

  1. import time
  2. import _thread
  3. from threading import Thread
  4. import queue
  5. # 使用线程锁,防止线程死锁
  6. mutex = _thread.allocate_lock()
  7. frame_queue = queue.Queue()
  8. def test(d_num):
  9. print("test: %s" % str(d_num))
  10. for i in range(d_num):
  11. frame_queue.put(i)
  12. def test1():
  13. while 1:
  14. if frame_queue.empty() != True:
  15. # 从队列中取出图片
  16. value = frame_queue.get()
  17. print('==value:', value)
  18. time.sleep(1)
  19. else:
  20. break
  21. def main():
  22. d_num = 10
  23. t1 = Thread(target=test, args=(d_num,))
  24. t1.start()
  25. t2 = Thread(target=test1)
  26. t2.start()
  27. if __name__ == '__main__':
  28. main()

2.3 注意传参与多进程的区别,线程池

  1. from functools import partial
  2. from itertools import repeat
  3. from multiprocessing import Pool, freeze_support
  4. def func(a, b):
  5. return a + b
  6. def main():
  7. a_args = [1, 2, 3]
  8. second_arg = 1
  9. with Pool() as pool:
  10. L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
  11. print('L:', L)
  12. M = pool.starmap(func, zip(a_args, repeat(second_arg)))
  13. print('M:', M)
  14. N = pool.map(partial(func, b=second_arg), a_args)
  15. print('N:', N)
  16. main()
  1. import requests
  2. import re
  3. import time
  4. from multiprocessing import Pool
  5. from multiprocessing.dummy import Pool as ThreadPool
  6. def spyder(url):
  7. # res = []
  8. res = {'init:':'hello'}
  9. print('hahah:{}'.format(url))
  10. time.sleep(1)
  11. # res.append(url)
  12. res.update({'entr:'+url:url})
  13. return res
  14. def use_process():
  15. urls = ["https://www.qiushibaike.com/text/page/{}/".format(str(i)) for i in range(0, 4)]
  16. start_1 = time.time()
  17. #获取函数返回结果
  18. res1 = []
  19. for url in urls:
  20. res_ = spyder(url)
  21. res1.append(res_)
  22. end_1 = time.time()
  23. print("单进程:", end_1 - start_1)
  24. print('res1:', res1)
  25. # 获取函数返回结果
  26. #  进程池
  27. start_2 = time.time()
  28. pool = Pool(processes=2)
  29. res2 = pool.map(spyder, urls)
  30. pool.close()
  31. pool.join()
  32. print('res2:', res2)
  33. end_2 = time.time()
  34. print("2进程:", end_2 - start_2)
  35. # 获取函数返回结果
  36. # 进程池
  37. start_3 = time.time()
  38. pool = Pool(processes=4)
  39. res3 = pool.map(spyder, urls)
  40. pool.close()
  41. pool.join()
  42. print('res2:', res3)
  43. end_3 = time.time()
  44. print("4进程:", end_3 - start_3)
  45. def use_threadpool():
  46. urls = [["https://www.qiushibaike.com/text/page/{}/".format(str(i))] for i in range(0, 4)]
  47. print('urls:', urls)
  48. # 线程池
  49. start = time.time()
  50. pool = ThreadPool(processes=4)
  51. res = pool.starmap(spyder, urls)
  52. pool.close()
  53. pool.join()
  54. end = time.time()
  55. print('res:', res)
  56. print("4线程:", end - start)
  57. if __name__ == "__main__":
  58. # use_process()
  59. use_threadpool()

实际应用将图片路径和名字传入,用zip方式打包传参

  1. import os
  2. import cv2
  3. import time
  4. import itertools
  5. from multiprocessing.dummy import Pool as ThreadPool
  6. SIZE = (75,75)
  7. SAVE_DIRECTORY='thumbs'
  8. def save_img(filename,save_path):
  9. save_path+= filename.split('/')[-1]
  10. im = cv2.imread(filename)
  11. im=cv2.resize(im,SIZE)
  12. cv2.imwrite(save_path,im)
  13. if __name__ == '__main__':
  14. path='./data/testlabel'
  15. print(path)
  16. output_path='./data/thumbs/'
  17. if not os.path.exists(output_path):
  18. os.mkdir(output_path)
  19. print(output_path)
  20. imgs_list_path=[os.path.join(path,i) for i in os.listdir(path)]
  21. print(len(imgs_list_path))
  22. start_time=time.time()
  23. pool = ThreadPool(processes=8)
  24. print(list(zip(imgs_list_path,[output_path]*len(imgs_list_path))))
  25. pool.starmap(save_img,zip(imgs_list_path,[output_path]*len(imgs_list_path)))
  26. pool.close()
  27. pool.join()
  28. end_time=time.time()
  29. print('use time=',end_time-start_time)

 

(0)

相关推荐