rabbitMq 学习笔记(一)

  消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题

实现高性能,高可用,可伸缩和最终一致性架构。

  RabbitMQ 是采用 Erlang 语言实现 AMQP (Advanced Message Queuing Protocol,高级消息 队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。

  RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上, RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型  

  

  Producer:生产者,就是投递消息的一方。

  生产者创建消息,然后发布到 RabbitMQ 中。消息一般可以包含 2 个部分:消息体和标签 CLabel)。消息体也可以称之为 payload,在实际应用中,消息体一般是一个带有业务逻辑结构的数据,比如一个 JSON 字符串。当然可以进一步对这个消息体进行序列化操作。消息的标签用来表述这条消息 , 比如一个交换器的名称和一个路由键。 生产者把消息交由 RabbitMQ, RabbitMQ 之后会根据标签把消息发送给感兴趣的消费者 CConsumer)。

   Consumer: 消费者, 就是接收消息的一方。

   消费者连接到 RabbitMQ 服务器,并订阅到队列上。 当消费者消费一条消息时, 只是消费 消息的消息体 Cpayload)。 在消息路由的过程中 , 消息的标签会丢弃, 存入到队列中的消息只 有消息体,消费者也只会消费到消息体, 也就不知道消息的生产者是谁,当然消费者也不需要 知道。

   Broker: 消息中间件的服务节点。
   对于 RabbitMQ 来说, 一个 RabbitMQ Broker 可 以简单地看作一个 RabbitMQ 服务节点 , 或者 RabbitMQ 服务实例 。 大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。

  交换器(Exchange)

  Rabbitmq中,生产者会将消息先发送到交换器,然后由交换器根据路由规则将消息转发到队列中,如果路由不到,或许会返回给生产者,或许直接丢弃。

  交换器有四种类型:fanout,direct,topic,header   

  Binding: 绑定。 RabbitMQ 中通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 (BindingKey),这样 RabbitMQ 就知道如何正确地将消息路由到队列了。

  routingkey:路由键。生产者将消息发送给交换器时,一般会指定一个routingkey,用来指定消息的路由规则, routingkey需要与交换器类型和绑定键 (BindingKey) 联合使用才能最终生效。

  队列(queue)

  队列是Rabbitmq的内部对象,用于存储消息。Rabbitmq的消息只能存储在队列中。消费者可以从队列中获取消息并消费。如多个消费者订阅同一个队列,这时队列内的消息会被平均分摊(轮询)给多个消费者处理。

  交换器类型:

  fanout :将所有消息转发至交换器绑定的所有队列中。

  direct :它会把消息路由到那些 BindingKey 和 RoutingKey完全匹配的队列中。

  topic :它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队 列中,但这里的匹配规则有些不同,它约定:

  •     RoutingKey 为一个点号". "分隔的字符串(被点号" "分隔开的每一段独立的字符 串称为一个单词 ),如“com.rabbitmq.client”;  
  •     BindingKey 和 RoutingKey 一样也是点号". "分隔的字符串;
  •       BindingKey 中可以存在两种特殊字符串"*"和"#",用于做模糊匹配,其中"*"用于匹配一个单词,"#"用于匹配多规格单词(可以是零个)。

  header :该类型的交换器性能很差,而且也不实用,基本上不会看到它的存在。

生产者代码:

  nuget:添加RabbitMQ.Client;

1 IConnectionFactory conFactory = new ConnectionFactory//创建连接工厂对象
2 {
3             HostName = "*.*.*.*",//IP地址
4             Port = 5672,//端口号
5             UserName = "yan",//用户账号
6             Password = "yan"//用户密码
7 };
1 using (IConnection con = conFactory.CreateConnection())
 2 {
 3     using (IModel channel = con.CreateModel())
 4     {
 5         channel.ExchangeDeclare("ExchangeName", "direct", true, false, null); //声明交换器,dureable:是否持久化,autoDelete:是否自动删除
 6         channel.QueueDeclare("QueueName", true, false, false, null); //声明队列 dureable:是否持久化,exclusive:是否排他 autoDelete:是否自动删除
 7         channel.QueueBind("QueueName", "ExchangeName", "RoutingKey", null);
 8         var properties = channel.CreateBasicProperties();
 9         properties.DeliveryMode = 2;  //消息持久化
10         channel.BasicPublish("ExchangeName", "RoutingKey", properties, Encoding.UTF8.GetBytes("HelloWord")); //发布消息
11      }
12 }

声明交换器参数:

  durable: 设置是否持久化。 durable 设置为 true 表示持久化, 反之是非持久化。持 久化可以将交换器存盘,在服务器重启 的时候不会丢失相关信息。 

  autoDelete: 设置是否自动删除。 autoDelete 设置为 true 则表示自动删除。自动 删除的前提是至少有一个队列或者交换器与这个交换器绑定, 之后所有与这个交换器绑
定的队列或者交换器都与此解绑。注意不能错误地把这个参数理解为: "当与此交换器 连接的客户端都断开时, RabbitMQ 会自动删除本交换器"。

  声明队列参数:

  durable: 设置是否持久化。为 true 则设置队列为持久化。持久化的队列会存盘,在 服务器重启的时候可以保证不丢失相关信息。

  exclusive: 设置是否排他。为 true 则设置队列为排他的。如果一个队列被声明为排 他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意 三点:排他队列是基于连接(Connection) 可见的,同一个连接的不同信道 (Channel) 是可以同时访问同一连接创建的排他队列; "首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同:即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除,这种队列 适用于一个客户端同时发送和读取消息的应用场景。

  autoDelete: 设置是否自动删除。为 true 则设置队列为自动删除。自动删除的前提是: 至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这 个队列自动删除",因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列。

消费者代码:

1 IConnectionFactory conFactory = new ConnectionFactory//创建连接工厂对象
 2 {
 3           HostName = "*.*.*.*",//IP地址
 4           Port = 5672,//端口号
 5           UserName = "yan",//用户账号
 6           Password = "yan"//用户密码
 7 };
 8 using (IConnection con = conFactory.CreateConnection())
 9 {
10      using (IModel channel = con.CreateModel())
11      {
12            channel.ExchangeDeclare("ExchangeName", "direct", true, false, null); //声明交换器,dureable:是否持久化,autoDelete:是否自动删除
13            channel.QueueDeclare("QueueName", true, false, false, null); //声明队列 dureable:是否持久化,exclusive:是否排他 autoDelete:是否自动删除
14            channel.QueueBind("QueueName", "ExchangeName", "RoutingKey", null);
15            EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //声明事件基本消费者
16            consumer.Received += (ch,ea) => {
17              var message = Encoding.UTF8.GetString(ea.Body);
18              Console.WriteLine($"收到消息: {message}"); //消费消息
19              channel.BasicAck(ea.DeliveryTag, false); //确认该消息已被消费
20            };
21            channel.BasicConsume("QueueName", false, consumer); //启动消费者,并设置为手动应答消息
22      }
23 }

  channel.BasicReject(ea.DeliveryTag,false);  //拒绝消息 requeue true:消息重新存入队列。false:立即会把消息从队列中移除。  

  为了保证消息从队列可靠地达到消费者, RabbitMQ 提供了消息确认机制( message acknowledgement)。 消费者在订阅队列时,可以指定 autoAck 参数,当 autoAck 等于 false 时, RabbitMQ 会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当 autoAck 等于 true 时, RabbitMQ 会自动把发送出去的 消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。

  采用消息确认机制后,只要设置 autoAck 参数为 false,消费者就有足够的时间处理消息 (任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题, 因为 RabbitMQ 会一直 等待持有消息直到消费者显式调用 Basic.Ack 命令为止。
  当 autoAck 参数置为 false,对于 RabbitMQ 服务端而言,队列中的消息分成了两个部分: 一部分是等待投递给消费者的消息:一部分是己经投递给消费者,但是还没有收到消费者确认信号的消息。如果 RabbitMQ 一直没有收到消费者的确认信号,并且消费此消息的消费者己经 断开连接,则 RabbitMQ 会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费者。

  RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否己经断开,这么设计的原因是 RabbitMQ 允许消费者 消费一条消息的时间可以很久很久。

(0)

相关推荐

  • java之学习记录 7 - 2 - RabbitMQ安装及使用

    怎么用RabbitMQ 想要安装RabbitMQ,必须先安装erlang语言环境,类似安装tomcat,必须先安装JDK 查看匹配的版本:https://www.rabbitmq.com/which- ...

  • 把Redis当作队列来用,真的合适吗?

    阅读本文大约需要 15 分钟. 大家好,我是 Kaito. 我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题. 有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便. ...

  • 万字长文书写RabbitMQ最全见解!以后再也不用到处去搜索了呀!

    典型应用场景 1.跨系统的异步通信 人民银行二代支付系统,使用重量级消息队列 IBM MQ,异步,解耦,削峰都有体现. 2.应用内的同步变成异步 秒杀:自己发送给自己 3.基于Pub/Sub模型实现的 ...

  • 消息队列全面了解(二)

    一.再谈消息队列的应用场景 1.异步处理:例如短信通知.终端状态推送.App推送.用户注册等 2.数据同部:业务数据推送同步 3.重试补偿:记账失败重试 4.系统解耦:通讯上下行.终端异常监控.分布式 ...

  • (1条消息) RabbitMQ六种通信模式介绍

    文章目录 一.简介 二.代码实现 三.程序运行效果 四.模式总结 一.简介 本篇博客所讲的为RabbitMQ六种通信模式之一的发布与订阅模式,官网给出的图如下所示: 简单模式与工作模式2个案例中,只有 ...

  • Kafka、RocketMQ、Pulsar全方位对比

    消息队列也通常称为消息中间件,提到消息队列,大部分互联网人或多或少都听过该名词.对于后端工程师而言,更是日常开发中必备的一项技能. 图片来自 Pexels 随着大数据时代的到来,Apache 旗下的 ...

  • C#队列学习笔记:RabbitMQ安装及使用

    一.环境搭建 1.1.由于RabbitMQ是使用Erlang语言开发的,因此要安装Erlang运行时环境,下载地址:Erlang官网下载  CSDN分享下载 1.2.去RabbitMQ官网下载Rabb ...

  • C#队列学习笔记:RabbitMQ优先级队列

    一.引言 在具体业务中可能会遇到一些要提前处理的消息,比如普通客户的消息按先进先出的顺序处理,Vip客户的消息要提前处理.在RabbitMQ中,消息优先级的实现方式是:在声明queue时设置队列的x- ...

  • C#队列学习笔记:RabbitMQ延迟队列

    一.引言 日常生活中,很多的APP都有延迟队列的影子.比如在手机淘宝上,经常遇到APP派发的限时消费红包,一般有几个小时或24小时不等.假如在红包倒计时的过程中,没有消费掉红包的话,红包会自动失效.假 ...

  • [学习笔记] RabbitMQ的安装使用

    安装 使用命令行安装,会自动管理依赖(推荐): choco install rabbitmq 安装包安装: 以管理员身份安装64位的 Erlang. 下载并安装 RabbitMQ 服务.下载地址. R ...

  • 一则公报案例学习笔记:对修改股东出资期限应否适用资本多数决规则的思考|审判研究

    一.问题的提出 2021年第3期<最高人民法院公报案例>刊登了鸿大(上海)投资管理有限公司与姚锦城公司决议纠纷上诉案,裁判要旨为:"公司股东滥用控股地位,以多数决方式通过修改出资 ...

  • JAVA多线程学习笔记整理

    多线程: 三种创建方法 继承Thread类,以线程运行内容重写run方法,创建Thread对象并用start方法启动该线程. (匿名内部类) (Lambda表达式) 实现Runable接口,以线程运行 ...

  • 周哥学习笔记(2021.5.8)

    心理界限存在的意义,正是为了帮助人们控制情绪进入的量,不至于太过冷漠或太过投入,让我们保持一个合适的距离与外界互动. 人没有办法只通过吸收变得更美好和丰富,它必须通过大胆的碰撞和创造.如果不能保持足够 ...

  • 【学习笔记】控制角色移动的N种方法,但都离不开重复执行

    [学习笔记]控制角色移动的N种方法,但都离不开重复执行 今天我们讲一下控制角色移动的多种方法,因为缺少操作实例,希望课下同学们结合例子好好练习. 首先,我们说一下控制角色移动的多种方法.最比较常见的就 ...

  • 胡希恕伤寒论学习笔记——42

    42.太阳病,外证未解,脉浮弱者,当以汗解,宜桂枝汤. 字面意思是说:太阳病,外证依然存在,脉是浮弱的,治疗上依然需要通过出汗的方法,这时应该用桂枝汤一类的方剂. "宜"字说明不是 ...