04消息队列zmq的发布者-订阅者的计算π的简单程序。

# 小知识:计算π的其中一个方法是,随机的向一个边长为n的正方形中撒豆子。# 然后看这些豆子是否在以n为半径的四分之一圆内,正方形面积:n*n,四分之一圆的面积:π*n*n/4# 因此落在四分之一圆内的概率为π/4,这样我们就能算出π的值。# 我们这个程序是来讲述zmq发布-订阅过程的流程,

# 通过计算π值为例子来进行计算。# 首先发布者bitsource随机生成一个字符串,字符串偶数位为纵坐标,奇数位为横坐标。# 一个字符串表示一个点,将奇数位转换为横坐标,将偶数位转换为纵坐标,然后通过计算# 这个点到原点的距离,来判断是否为以半径为B的圆内,通过大量的模拟,来进行计算π。

import random, threading, time, zmqB = 32  # number of bits of precision in each random integer

def ones_and_zeros(digits):    """Express `n` in at least `d` binary digits, with no special prefix."""    # getrandbits()    # 方法返回指定大小(以位为单位)的整数。    return bin(random.getrandbits(digits)).lstrip('0b').zfill(digits)

# 发布者def bitsource(zcontext, url):    # 发布订阅函数,发布者。    """Produce random points in the unit square."""    # 创造一个对象。发布者对象    zsock = zcontext.socket(zmq.PUB)    # 绑定URL    zsock.bind(url)    while True:        # 持续不断的一直发送这里是32位的二进制字符。        # 这里一个字符串代表一个点。        zsock.send_string(ones_and_zeros(B * 2))        # 停顿0.01秒。        time.sleep(0.01)# 订阅者1def always_yes(zcontext, in_url, out_url):    """Coordinates in the lower-left quadrant are inside the unit circle."""    # 创造一个订阅者对象。    isock = zcontext.socket(zmq.SUB)    # 连接发布者的URL    isock.connect(in_url)    # 设置过滤条件,接收00开头的    # 如果开头为00的话,那么这个点x、y值都不会超过半径的一半    # 一定在四分之一圆内。    isock.setsockopt(zmq.SUBSCRIBE, b'00')    # 推送。    osock = zcontext.socket(zmq.PUSH)    # 连接接收推送的URL    osock.connect(out_url)    while True:        # 接收订阅者的消息。        isock.recv_string()        # 推送给发布者消息。因此我们这里直接发送Y        osock.send_string('Y')# 订阅者2def judge(zcontext, in_url, pythagoras_url, out_url):    """Determine whether each input coordinate is inside the unit circle."""    #     # 创造一个订阅者对象。    isock = zcontext.socket(zmq.SUB)    # 连接URL    isock.connect(in_url)    # 设置接收订阅的过滤条件。    for prefix in b'01', b'10', b'11':        isock.setsockopt(zmq.SUBSCRIBE, prefix)    # 设置一个响应对象。    psock = zcontext.socket(zmq.REQ)    psock.connect(pythagoras_url)    # 设置一个推送对象。    osock = zcontext.socket(zmq.PUSH)    osock.connect(out_url)    # 这里用了勾股定理,是两个落在坐标轴上的点,平方和。    unit = 2 ** (B * 2)    # 这里需要计算是否在四分之一圆内。    while True:        # 接收发布者的消息。        bits = isock.recv_string()        # 提取这个点的x坐标,y坐标。        n, m = int(bits[::2], 2), int(bits[1::2], 2)        # 发送给客户请求端        psock.send_json((n, m))        # 然后接受客户请求端发送过来处理过的数据。        sumsquares = psock.recv_json()        # 判断是否在圆内。        osock.send_string('Y' if sumsquares < unit else 'N')# 请求端,def pythagoras(zcontext, url):    """Return the sum-of-squares of number sequences."""    zsock = zcontext.socket(zmq.REP)    zsock.bind(url)    while True:        # 这里先请求数据,然后将请求的数据进行处理发送出去。        numbers = zsock.recv_json()        zsock.send_json(sum(n * n for n in numbers))# 汇总,进行计算π的值。def tally(zcontext, url):    """Tally how many points fall within the unit circle, and print pi."""    zsock = zcontext.socket(zmq.PULL)    zsock.bind(url)    # 这里是如果接受到一个Y p+4,接受到一个N ,q + 1    # 然后计算比值,这个就是我们算出来的π的值。    p = q = 0    while True:        decision = zsock.recv_string()        q += 1        if decision == 'Y':            p += 4        print(decision, p / q)

# 我们使用多线程的方式,上边的每一个函数开一个线程,。def start_thread(function, *args):    thread = threading.Thread(target=function, args=args)    thread.daemon = True  # so you can easily Ctrl-C the whole program    thread.start()

def main(zcontext):    pubsub = 'tcp://127.0.0.1:6700'    reqrep = 'tcp://127.0.0.1:6701'    pushpull = 'tcp://127.0.0.1:6702'    start_thread(bitsource, zcontext, pubsub)    start_thread(always_yes, zcontext, pubsub, pushpull)    start_thread(judge, zcontext, pubsub, reqrep, pushpull)    start_thread(pythagoras, zcontext, reqrep)    start_thread(tally, zcontext, pushpull)    # 这个是主线程,主线程结束,其他的多线程也就要结束。    time.sleep(30)

if __name__ == '__main__':    main(zmq.Context())
(0)

相关推荐

  • 爬虫爬取代理ip

    import urllib.request from bs4 import BeautifulSoup import re import time import random # ---------- ...

  • flask基础学习一

    源码: from flask import Flask,redirect,url_for app = Flask(__name__) @app.route("/home") def ...

  • 消息队列和发布订阅

    编程语言集成了发布订阅 很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用EventListener实现订阅,使用ApplicationEve ...

  • 消息队列在RTOS的应用

    传说互联网应用有两大利器,一个是缓存,另一个就是消息队列. 一直相对消息队列做一下梳理,希望早日另有成文. 一叶知秋,实际上消息队列在嵌入式系统中同样有着广泛的应用. 近来致力于IoT和智能硬件,现学 ...

  • 手把手教姐姐写消息队列

    前言 这周姐姐入职了新公司,老板想探探他的底,看了一眼他的简历,呦呵,精通kafka,这小姑娘有两下子,既然这样,那你写一个消息队列吧.因为要用go语言写,这可给姐姐愁坏了.赶紧来求助我,我这么坚贞不 ...

  • C#后台异步消息队列实现

    简介 基于生产者消费者模式,我们可以开发出线程安全的异步消息队列. 知识储备 什么是生产者消费者模式? 为了方便理解,我们暂时将它理解为垃圾的产生到结束的过程. 简单来说,多住户产生垃圾(生产者)将垃 ...

  • RabbitMQ消息队列之Windows下安装和部署(一)

    参考文档: https://jingyan.baidu.com/article/ed15cb1bb5c3411be369819d.html https://blog.csdn.net/hzw19920 ...

  • RabbitMQ 消息队列中 VirtualHost介绍 与权限管理 | IT工程师的生活足迹

    一.VirtualHost 像mysql服务有数据库的概念并且可以设置用户对库和表等对象的操作权限,RabbitMQ也有类似的权限管理. 在RabbitMQ中可以虚拟消息服务器 VirtualHost ...

  • Redis、Kafka 和 Pulsar 消息队列对比

    刘德恩 云时代架构 一.最基础的队列 最基础的消息队列其实就是一个双端队列,我们可以用双向链表来实现,如下图所示: push_front:添加元素到队首: pop_tail:从队尾取出元素. 有了这样 ...

  • Go 中如何让消息队列达到最大吞吐量?

    kevwan Go语言中文网 今天 你在使用消息队列的时候关注过吞吐量吗? 思考过吞吐量的影响因素吗? 考虑过怎么提高吗? 总结过最佳实践吗? 本文带你一起探讨下消息队列消费端高吞吐的 Go 框架实现 ...

  • 消息队列之activeMQ

    消息队列之RabbitMQ 消息队列之kafka 1.activeMQ的主要功能 实现高可用.高伸缩.高性能.易用和安全的企业级面向消息服务的系统 异步消息的消费和处理 控制消息的消费顺序 可以和Sp ...