04消息队列zmq的发布者-订阅者的计算π的简单程序。
# 小知识:计算π的其中一个方法是,随机的向一个边长为n的正方形中撒豆子。# 然后看这些豆子是否在以n为半径的四分之一圆内,正方形面积:n*n,四分之一圆的面积:π*n*n/4# 因此落在四分之一圆内的概率为π/4,这样我们就能算出π的值。# 我们这个程序是来讲述zmq发布-订阅过程的流程, # 通过计算π值为例子来进行计算。# 首先发布者bitsource随机生成一个字符串,字符串偶数位为纵坐标,奇数位为横坐标。# 一个字符串表示一个点,将奇数位转换为横坐标,将偶数位转换为纵坐标,然后通过计算# 这个点到原点的距离,来判断是否为以半径为B的圆内,通过大量的模拟,来进行计算π。 import random, threading, time, zmqB = 32 # number of bits of precision in each random integer def ones_and_zeros(digits): """Express `n` in at least `d` binary digits, with no special prefix.""" # getrandbits() # 方法返回指定大小(以位为单位)的整数。 return bin(random.getrandbits(digits)).lstrip('0b').zfill(digits) # 发布者def bitsource(zcontext, url): # 发布订阅函数,发布者。 """Produce random points in the unit square.""" # 创造一个对象。发布者对象 zsock = zcontext.socket(zmq.PUB) # 绑定URL zsock.bind(url) while True: # 持续不断的一直发送这里是32位的二进制字符。 # 这里一个字符串代表一个点。 zsock.send_string(ones_and_zeros(B * 2)) # 停顿0.01秒。 time.sleep(0.01)# 订阅者1def always_yes(zcontext, in_url, out_url): """Coordinates in the lower-left quadrant are inside the unit circle.""" # 创造一个订阅者对象。 isock = zcontext.socket(zmq.SUB) # 连接发布者的URL isock.connect(in_url) # 设置过滤条件,接收00开头的 # 如果开头为00的话,那么这个点x、y值都不会超过半径的一半 # 一定在四分之一圆内。 isock.setsockopt(zmq.SUBSCRIBE, b'00') # 推送。 osock = zcontext.socket(zmq.PUSH) # 连接接收推送的URL osock.connect(out_url) while True: # 接收订阅者的消息。 isock.recv_string() # 推送给发布者消息。因此我们这里直接发送Y osock.send_string('Y')# 订阅者2def judge(zcontext, in_url, pythagoras_url, out_url): """Determine whether each input coordinate is inside the unit circle.""" # # 创造一个订阅者对象。 isock = zcontext.socket(zmq.SUB) # 连接URL isock.connect(in_url) # 设置接收订阅的过滤条件。 for prefix in b'01', b'10', b'11': isock.setsockopt(zmq.SUBSCRIBE, prefix) # 设置一个响应对象。 psock = zcontext.socket(zmq.REQ) psock.connect(pythagoras_url) # 设置一个推送对象。 osock = zcontext.socket(zmq.PUSH) osock.connect(out_url) # 这里用了勾股定理,是两个落在坐标轴上的点,平方和。 unit = 2 ** (B * 2) # 这里需要计算是否在四分之一圆内。 while True: # 接收发布者的消息。 bits = isock.recv_string() # 提取这个点的x坐标,y坐标。 n, m = int(bits[::2], 2), int(bits[1::2], 2) # 发送给客户请求端 psock.send_json((n, m)) # 然后接受客户请求端发送过来处理过的数据。 sumsquares = psock.recv_json() # 判断是否在圆内。 osock.send_string('Y' if sumsquares < unit else 'N')# 请求端,def pythagoras(zcontext, url): """Return the sum-of-squares of number sequences.""" zsock = zcontext.socket(zmq.REP) zsock.bind(url) while True: # 这里先请求数据,然后将请求的数据进行处理发送出去。 numbers = zsock.recv_json() zsock.send_json(sum(n * n for n in numbers))# 汇总,进行计算π的值。def tally(zcontext, url): """Tally how many points fall within the unit circle, and print pi.""" zsock = zcontext.socket(zmq.PULL) zsock.bind(url) # 这里是如果接受到一个Y p+4,接受到一个N ,q + 1 # 然后计算比值,这个就是我们算出来的π的值。 p = q = 0 while True: decision = zsock.recv_string() q += 1 if decision == 'Y': p += 4 print(decision, p / q) # 我们使用多线程的方式,上边的每一个函数开一个线程,。def start_thread(function, *args): thread = threading.Thread(target=function, args=args) thread.daemon = True # so you can easily Ctrl-C the whole program thread.start() def main(zcontext): pubsub = 'tcp://127.0.0.1:6700' reqrep = 'tcp://127.0.0.1:6701' pushpull = 'tcp://127.0.0.1:6702' start_thread(bitsource, zcontext, pubsub) start_thread(always_yes, zcontext, pubsub, pushpull) start_thread(judge, zcontext, pubsub, reqrep, pushpull) start_thread(pythagoras, zcontext, reqrep) start_thread(tally, zcontext, pushpull) # 这个是主线程,主线程结束,其他的多线程也就要结束。 time.sleep(30) if __name__ == '__main__': main(zmq.Context())
赞 (0)