python 生产者消费者demo

# -*- coding:utf-8 -*-from multiprocessing import JoinableQueue, Process, Valueimport timeDOWNLOAD_EXCEPTION_MINUTE = 3DEAL_EXCEPTION_MINUTE = 2class SharedObj(object):    def __init__(self, id):        self.id = id        passfrom collections import namedtupleSHARED_OBJ = namedtuple("shared_obj", ["id"])def download_process(produce_queue, minute_list, sharded_time_cost, shared_is_exception):    def process(minute_name):        print("Begin download process minute:%s." % minute_name)        # if minute_name == DOWNLOAD_EXCEPTION_MINUTE:        #     raise Exception("Exception in minute_name:%s when download process." % minute_name)        time.sleep(10)        print("Success download progress minute:%s." % minute_name)    try:        start_time = time.time()        for minute_name in minute_list:            if shared_is_exception.value:                print("Some other error happened when begin download minute:%s." % minute_name)                break            process(minute_name)            produce_queue.put(minute_name)        end_time = time.time()        with sharded_time_cost.get_lock():            sharded_time_cost.value = int(end_time - start_time)    except Exception as e:        print(e)        if shared_is_exception.value:            print("Some other error happened.")        with shared_is_exception.get_lock():            shared_is_exception.value = 1    #    produce_queue.put(None)    produce_queue.join()def deal_process(produce_queue, shared_deal_time_cost, shared_done_minute_name, shared_done_count, shared_is_exception,                 s):    def process():        print("Begin deal minute:%s." % minute_name)        # if minute_name == DEAL_EXCEPTION_MINUTE:        #     raise Exception("Exception happened in minute:%s, when deal." % minute_name)        print(s)        time.sleep(5)        print("Done for deal minute:%s." % minute_name)    while True:        minute_name = produce_queue.get()        #        print("-----deal process----")        print("shared_deal_time_cost:%s." % shared_deal_time_cost.value)        print("shared_done_minute_name:%s." % shared_done_minute_name.value)        print("shared_done_count:%s." % shared_done_count.value)        print("shared_is_exception:%s." % shared_is_exception.value)        print("---------------------")        #        if minute_name is None:            print("Get empty minute name, done for deal.")            produce_queue.task_done()            break        try:            begin_time = time.time()            if not shared_is_exception.value:                process()            else:                print("shared_is_exception is true, exception is happened; continue...")            end_time = time.time()            #            with shared_deal_time_cost.get_lock():                shared_deal_time_cost.value  = int(end_time - begin_time)            with shared_done_minute_name.get_lock():                shared_done_minute_name.value = minute_name            with shared_done_count.get_lock():                shared_done_count.value  = 1        except Exception as e:            print (e)            if not shared_is_exception.value:                with shared_is_exception.get_lock():                    shared_is_exception.value = 1        finally:            #            produce_queue.task_done()class TestObj(object):    def __init__(self):        self.shared_done_minute = Value("i", 0)        self.shared_done_count = Value("i", 0)        self.shared_download_time_cost = Value("i", 0)        self.shared_deal_time_cost = Value("i", 0)        self.shared_is_exception = Value("i", 0)    def main_process(self, ):        minute_list = [1, 2, 3, 4, 5]        #        produce_queue = JoinableQueue()        download_processor = Process(target=download_process,                                     args=(produce_queue, minute_list, self.shared_download_time_cost,                                           self.shared_is_exception,))        #        s = SharedObj(id=1)        print(s)        deal_processor = Process(target=deal_process, args=(            produce_queue, self.shared_deal_time_cost, self.shared_done_minute, self.shared_done_count,            self.shared_is_exception, s,))        start_time = time.time()        #        download_processor.daemon = True        deal_processor.daemon = True        #        download_processor.start()        deal_processor.start()        #        download_processor.join()        deal_processor.join()        #        end_time = time.time()        print("Main time cost:%s." % (end_time - start_time))def main():    test_obj = TestObj()    test_obj.main_process()if __name__ == '__main__':    main()

  

来源:https://www.icode9.com/content-1-755101.html

(0)

相关推荐