asp .net core发布订阅kafka
Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:
通过O的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万 [2] 的消息。
支持通过Kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
Kafka通过官网发布了最新版本2.3.0
相关术语介绍
Broker
Kafka集群包含一个或多个服务器,这种服务器被称为brokerTopic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)Partition
Partition是物理上的概念,每个Topic包含一个或多个Partition.Producer
负责发布消息到Kafka brokerConsumer
消息消费者,向Kafka broker读取消息的客户端。Consumer Group
每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
在这里我们用了一个第三方库叫Confluent.kafka,在nuget上搜索一下就出来了,感谢原作者。
新建一个 .net core类库项目
安装第三方依赖库,如下图所示:
新建一个SUPKafkaTopicConsumer类
这是用来创建并初始化消费者,接下来看看这个类里面包含了什么。
首先声明一个委托,用来接收订阅消息
public delegate void OnReceivedHandle(object data);
初始化消费者,构造函数中传入kafka地址,以及要订阅的组groupId,另外注入了log4net记录日志信息。
init()方法用来初始化,新建一个消费者,具体代码如下。
public class SUPKafkaTopicConsumer<TKey, TValue> { private IConsumer<TKey, TValue> consumer; private SUPLogger logger_; private string BootStrapServer; private string GroupId; public SUPKafkaTopicConsumer(string bootStrapServer, string groupId, SUPLogger logger = null) { BootStrapServer = bootStrapServer; GroupId = groupId; logger_ = logger; } public bool Init() { try { var conf = new ConsumerConfig { GroupId = GroupId, BootstrapServers = BootStrapServer, AutoOffsetReset = AutoOffsetReset.Earliest, EnableAutoCommit = false // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失 }; consumer = new ConsumerBuilder<TKey, TValue>(conf) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build(); return true; } catch (Exception ex) { throw; } }
定义回调事件,用以处理用户自定义方法。
public event OnReceivedHandle onReceivedHandle;
定义一个订阅的方法,传入topic,以及是否需要提交偏移量。
其实看init()方法中我把EnableAutoCommit=false,取消了自动提交,让应用程序决定何时提交 偏移量,为什么这么做呢?
自动提交虽然方便,但是也有一些弊端,自动提交的弊端是通过间隔时间。 一般是默认5s提交时间间隔,在最近一次提交之后的 3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后 了 3s,所以在这 3s 内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复消息的时间窗,不过这种情况是无也完全避免的 。
大部分开发者通过控制偏移量提交时间来消除丢失消息的可能性,井在发生再均衡时减少 重复消息的数量。消费者 API提供了另一种提交偏移量的方式 , 开发者可以在必要的时候 提交当前偏移盘,而不是基于时间间隔。
public void Subscribe(string topic, bool isCommit) { try { if (consumer != null) { consumer.Subscribe(topic); while (true) { var consume = consumer.Consume(); if (onReceivedHandle != null) { onReceivedHandle(consume); if (isCommit) { consumer.Commit(consume); } } } } } catch (Exception ex) { //consumer.Close(); throw ex; } }
取消订阅
public void UnSubscribe() { if (consumer != null) { consumer.Unsubscribe(); } }
新建生产者类
首先定义了ISUPKafkaProducer<Tkey, TValue>接口,包含四个方法
public interface ISUPKafkaProducer<Tkey,TValue> { ISendResult Send(Tkey key, TValue value, string topic,Action<DeliveryReport<Tkey, TValue>> sendCallBack = null); ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null); ISendResult AsyncSend(Tkey key, TValue value,string topic); ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition); }
接口的实现,初始化过程类似消费者
internal class SUPKafkaTopicProducer<Tkey, TValue> : ISUPKafkaProducer<Tkey, TValue> { private IProducer<Tkey, TValue> producer; private SUPLogger logger_; private string m_bootStrapServer; public SUPKafkaTopicProducer(string bootStrapServer,SUPLogger logger = null) { m_bootStrapServer = bootStrapServer; logger_ = logger; } public bool Init() { try { var config = new ProducerConfig { BootstrapServers = m_bootStrapServer }; producer = new ProducerBuilder<Tkey, TValue>(config) .SetErrorHandler((producer, error) => { logger_.Fatal(string.Format("Kafka Error Handler {0},ErrorCode:{2},Reason:{3}", m_bootStrapServer, error.Code, error.Reason)); }) .SetLogHandler((producer, msg) => { logger_.Info(string.Format("Kafka Log Handler {0}-{1},Name:{2},Message:{3}", m_bootStrapServer, msg.Name, msg.Message)); }) .Build(); return true; } catch (Exception ex) { throw ex; } }
实现继承至ISUPKafkaProducer<Tkey, TValue>的方法
public ISendResult Send(Tkey key, TValue value,string topic, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; producer.Produce(topic, message, sendCallBack); return new SendResult(true); } else { return new SendResult(true, "没有初始化生产者"); } } catch (Exception ex) { throw ex; } } public ISendResult Send(Tkey key, TValue value, TopicPartition topicPartition, Action<DeliveryReport<Tkey, TValue>> sendCallBack = null) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; producer.Produce(topicPartition, message, sendCallBack); return new SendResult(true); } else { return new SendResult(true, "没有初始化生产者"); } } catch (Exception ex) { throw ex; } } public ISendResult AsyncSend(Tkey key, TValue value,string topic) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; var deliveryReport = producer.ProduceAsync(topic, message); deliveryReport.ContinueWith(task => { Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset); }); producer.Flush(TimeSpan.FromSeconds(10)); return new SendResult(true); } else { return new SendResult(true, "没有初始化生产者"); } } catch (Exception ex) { throw ex; } } public ISendResult AsyncSend(Tkey key, TValue value, TopicPartition topicPartition) { try { if (producer != null) { var message = new Message<Tkey, TValue> { Value = value, Key = key }; var deliveryReport = producer.ProduceAsync(topicPartition, message); deliveryReport.ContinueWith(task => { Console.WriteLine("Producer: " + producer.Name + "\r\nTopic: " + topicPartition.Topic + "\r\nPartition: " + task.Result.Partition + "\r\nOffset: " + task.Result.Offset); }); producer.Flush(TimeSpan.FromSeconds(10)); return new SendResult(true); } else { return new SendResult(true, "没有初始化生产者"); } } catch (Exception ex) { throw ex; } }
新建一个SUPKafkaMessageCenter类
这个类是对外开放的,我们利用这个类来管理生产者和消费者,看下代码非常简单。
public static class SUPKafkaMessageCenter<Tkey, TValue> { private static SUPLogger logger = null; static SUPKafkaMessageCenter() { SUPLoggerManager.Configure(); logger = new SUPLogger("KafkaCenter"); } /// <summary> /// 创建生产者 /// </summary> /// <param name="bootstrapServer"></param> /// <param name="topicName"></param> /// <returns></returns> public static ISUPKafkaProducer<Tkey, TValue> CreateTopicProducer(string bootstrapServer) { if (string.IsNullOrEmpty(bootstrapServer)) { return null; } var producer = new SUPKafkaTopicProducer<Tkey, TValue>(bootstrapServer, logger); if (!producer.Init()) { return null; } return producer; } /// <summary> /// 创建消费者 /// </summary> /// <param name="bootstrapServer"></param> /// <param name="groupId"></param> /// <returns></returns> public static SUPKafkaTopicConsumer<Tkey, TValue> CreateTopicConsumer(string bootstrapServer, string groupId= "default-consumer-group") { if (string.IsNullOrEmpty(bootstrapServer)) { return null; } var consumer = new SUPKafkaTopicConsumer<Tkey, TValue>(bootstrapServer, groupId,logger); if (!consumer.Init()) { return null; } return consumer; }
测试
新建一个测试的控制台程序,调用代码如下
消费者
var consumer = SUPKafkaMessageCenter<string, string>.CreateTopicConsumer("localhost:9092"); //绑定接收信息,回调函数 consumer.onReceivedHandle += CallBack; var topics = new List<string>(); topics.Add("kafka-default-topic"); topics.Add("test"); //订阅主题 consumer.Subscribe(topics, false);
生产者
ISUPKafkaProducer<string, string> kafkaCenter = SUPKafkaMessageCenter<string, string>.CreateTopicProducer("localhost:9092");kafkaCenter.Send(i.ToString(), "", "kafka-default-topic",deliveryReport =>{...});
除了上面写的这些方法,其实对于kafka还有很多功能,比如topic的增删改查,我把它认为是管理类的,这里就不贴代码了。