Spring 对Apache Kafka的支持与集成
1. 引言
Apache Kafka 是一个分布式的、容错的流处理系统。在本文中,我们将介绍Spring对Apache Kafka的支持,以及原生Kafka Java客户端Api 所提供的抽象级别。
Spring Kafka 通过 @KafkaListener 注解,带来了一个简单而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和消息驱动的 POJO 。
2. 安装和设置
要下载和安装Kafka,请参考官方指南。然后还需要在 pom.xml
文件中添加 spring-kafka
:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.7.RELEASE</version></dependency>
新建一个 Spring Boot 示例应用程序,以默认配置启动。
3. 配置 Topics
以前我们使用命令行工具在 Kafka
中创建 topic
,例如:
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
但是随着 AdminClient 在Kafka中的引入,我们现在可以通过编程来创建 Topic
。
如下代码,添加 KafkAdmin
bean 到 Spring中,它将自动为 NewTopic
类的所有 bean
添加 topic
:
@Configurationpublic class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("developlee", 1, (short) 1); }}
4. 消息生成
要创建消息,首先需要配置 ProducerFactory ,并设置创建 Kafka Producer 实例的策略,然后使用 KafkaTemplate
。 KafkaTemplate
包装了 Producer
实例,并提供向 Kafka Topic
发送消息的简便方法。
在整个应用程序上下文中使用单个实例将提供更高的性能。因此推荐使用一个 Producer
实例。该实例是线程安全的,所以 KakfaTemplate
实例也是线程安全的,
4.1. Producer 配置
@Configurationpublic class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); }}
4.2. 消息发布
我们使用 KafkaTemplate
来发布消息:
@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg);}
send
API 返回 ListenableFuture
对象。如果我们想阻塞发送线程并获得关于发送消息的结果,我们可以调用ListenableFuture
对象的 get
API。线程将会等待结果,但它会降低生产者的速度。
Kafka是一个快速流处理平台。因此,最好异步处理结果,这样后续消息就无需等待前一条消息的结果。我们可以通过回调来实现:
public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } });}
5. 消息消费
5.1. 消费者配置
对于消费消息,我们需要配置一个 ConsumerFactory
和一个 KafkaListenerContainerFactory
。
一旦这些bean在Spring Bean工厂中可用,就可以使用 @KafkaListener
注解配置基于POJO的消费者。
配置类上需要添加 @EnableKafka
注解,以便能够检测Spring
管理的bean上的 @KafkaListener
注解:
@EnableKafka@Configurationpublic class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }}
5.2. 消息消费
@KafkaListener(topics = "topicName", groupId = "foo")public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message);}
可以为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者可以监听来自不同 topic 的消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 还支持使用 listener 中的 @Header 注解检索一个或多个消息标题:
@KafkaListener(topics = "topicName")public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition);}
5.3. 消费来自特定分区的消息
注意到,我们只使用一个分区创建了 topic “developlee”。但是,对于具有多个分区的主题,@KafkaListener 可以显式订阅具有初始偏移量 topic 的特定分区:
@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory")public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition);}
由于 initialOffset 已被发送到该 listener 中的分区0,因此每次初始化该 listener
时,将重新使用以前从分区0和分区3消耗的所有消息。如果不需要设置偏移量,我们可以使用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:
@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
5.4. 为Listener添加消息过滤器
通过添加自定义过滤器,可以将 listener
配置为使用特定类型的消息。这可以通过将 RecordFilterStrategy
设置为 KafkaListenerContainerFactory
来完成:
@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory;}
然后可以将 listener
配置为使用此容器工厂:
@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory")public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message);}
在这个 listener
中,所有与过滤器匹配的消息都将被丢弃。
6. 自定义消息转换器
到目前为止,我们只讨论了字符串作为消息发送和接收的对象。但是,我们也可以发送和接收定制的Java对象。这需要在 ProducerFactory
中配置适当的序列化器,并在 ConsumerFactory
中配置反序列化器。
让我们看一个简单的bean,并将以消息的形式发送它:
public class Greeting { private String msg; private String name; // standard getters, setters and constructor}
6.1. 生产自定义消息
在本例中,我们将使用 JsonSerializer
。我们看看 ProducerFactory
和 KafkaTemplate
的代码:
@Beanpublic ProducerFactory<String, Greeting> greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps);} @Beanpublic KafkaTemplate<String, Greeting> greetingKafkaTemplate() { return new KafkaTemplate<>(greetingProducerFactory());}
新的 KafkaTemplate
可用于发送 Greeting 消息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2. 消费自定义消息
同样,我们修改 ConsumerFactory
和 KafkaListenerContainerFactory
来正确反序列化 Greeting 消息:
@Beanpublic ConsumerFactory<String, Greeting> greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory<>( props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class));} @Beanpublic ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory;}
spring-kafka
JSON序列化器和反序列化器使用 Jackson 库,该库是 spring-kafka
项目的可选maven依赖项。我们也把它加到 pom.xml 文件:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version></dependency>
建议不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka
的版本。
最后,我们需要编写一个 listener 来 消费 Greeting 消息:
@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory")public void greetingListener(Greeting greeting) { // process greeting message}
7. 结语
在本文中,我们介绍了Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接收消息的类。
本文的完整源代码可以在GitHub上找到. 在执行代码之前,请确保服务器正在运行 Kafka。