掌握 Kafka,看这篇就足够了
以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间的访问性能。
高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒 100K 条消息的传输。
支持 Kafka Server 间的消息分区,及分布式消费,同时保证每个 Partition 内的消息顺序传输。
同时支持离线数据处理和实时数据处理。
支持在线水平扩展。
建立实时流数据管道,以可靠地在系统或应用程序之间获取数据。
构建实时流应用程序,以转换或响应数据流。
Kafka 在一个或多个可以跨越多个数据中心的服务器上作为集群运行。
Kafka 集群将记录流存储在称为主题的类别中。
每个记录由一个键,一个值和一个时间戳组成。
顺序读写:Kafka 将消息写入到了分区 Partition 中,而分区中的消息又是顺序读写的。顺序读写要快于随机读写。
零拷贝:生产者、消费者对于 Kafka 中的消息是采用零拷贝实现的。
批量发送:Kafka 允许批量发送模式。
消息压缩:Kafka 允许对消息集合进行压缩。
快速持久化,可以在 O(1) 的系统开销下进行消息持久化。
高吞吐,在一台普通的服务器上既可以达到 10W/s 的吞吐速率。
完全的分布式系统,Broker、Producer、Consumer 都原生自动支持分布式,自动实现负载均衡。
支持 Hadoop 数据并行加载,对于像 Hadoop 的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。
ISR,In-Sync Replicas,是指副本同步列表。ISR 列表是由 Leader 负责维护。
AR,Assigned Replicas,指某个 Partition 的所有副本, 即已分配的副本列表。
OSR,Outof-Sync Replicas,即非同步的副本列表。
AR=ISR+OSR
HW,HighWatermark,高水位,表示 Consumer 可以消费到的最高 Partition 偏移量。HW 保证了 Kafka 集群中消息的一致性。确切地说,是保证了 Partition 的 Follower 与 Leader 间数 据的一致性。
LEO,Log End Offset,日志最后消息的偏移量。消息是被写入到 Kafka 的日志文件中的, 这是当前最后一个写入的消息在 Partition 中的偏移量。
对于 Leader 新写入的消息,Consumer 是不能立刻消费的。Leader 会等待该消息被所有 ISR 中的 Partition Follower 同步后才会更新 HW,此时消息才能被 Consumer 消费。
Producer 先从 ZooKeeper 中找到该 Partition 的 Leader。
Producer将消息发送给该 Leader。
Leader 将消息接入本地的 log,并通知 ISR 的 Followers。
ISR 中的 Followers 从 Leader 中 Pull 消息, 写入本地 log 后向 Leader 发送 Ack。
Leader 收到所有 ISR 中的 Followers 的 Ack 后,增加 HW 并向 Producer 发送 Ack,表示消息写入成功。
若指定了 Partition,则直接写入到指定的 Partition。
若未指定 Partition 但指定了 Key,则通过对 Key 的 Hash 值与 Partition 数量取模,该取模。
结果就是要选出的 Partition 索引。
若 Partition 和 Key 都未指定,则使用轮询算法选出一个 Partition。
在传输过程中会出现消息丢失。
在 Broker 内部会出现消息丢失。
会出现写入到 Kafka 中的消息的顺序与生产顺序不一致的情况。
Consumer 向 Broker 提交连接请求,其所连接上的 Broker 都会向其发送Broker Controller 的通信 URL,即配置文件中的 Listeners 地址。
当 Consumer 指定了要消费的 Topic 后,会向 Broker Controller 发送消费请求。
Broker Controller 会为 Consumer 分配一个或几个 Partition Leader,并将该 Partition 的当前 Offset 发送给 Consumer。
Consumer 会按照 Broker Controller 分配的 Partition 对其中的消息进行消费。
当 Consumer 消费完该条消息后,Consumer 会向 Broker 发送一个消息已经被消费反馈,即该消息的 Offset。
在 Broker 接收到 Consumer 的 Offset 后,会更新相应的 __consumer_offset 中。
以上过程会一直重复,知道消费者停止请求消费。
Consumer 可以重置 Offset,从而可以灵活消费存储在 Broker 上的消息。
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.11-2.4.0.tgz
mkdir /data/servers
tar xzvf kafka_2.11-2.4.0.tgz -C /data/servers/
cd /data/servers/kafka_2.11-2.4.0
确保每个机器上的id不一样
broker.id=0
配置服务端的监控地址
listeners=PLAINTEXT://192.168.51.128:9092
kafka 日志目录
log.dirs=/data/servers/kafka_2.11-2.4.0/logs
#kafka设置的partitons的个数
num.partitions=1
ZooKeeper的连接地址,如果有自己的 ZooKeeper 集群,请直接使用自己搭建的zookeeper集群
zookeeper.connect=192.168.51.128:2181
#创建对应的日志目录
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094
#拷贝三份配置文件
cp server.properties server_9092.properties
cp server.properties server_9093.properties
cp server.properties server_9094.properties
#9092的id为0, 9093的id为1, 9094的id为2
broker.id=0
# 配置服务端的监控地址, 分别在不通的配置文件中写入不同的端口
listeners=PLAINTEXT://192.168.51.128:9092
# kafka 日志目录, 目录也是对应不同的端口
log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092
# kafka设置的partitons的个数
num.partitions=1
# ZooKeeper 的连接地址, 如果有自己的 ZooKeeper 集群, 请直接使用自己搭建的 ZooKeeper 集群
zookeeper.connect=192.168.51.128:2181
dataDir=/data/servers/zookeeper
server.1=192.168.51.128:2888:3888
#创建对应的日志目录
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9092
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9093
mkdir -p /data/servers/kafka_2.11-2.4.0/logs/9094
#拷贝三份配置文件
cp server.properties server_9092.properties
cp server.properties server_9093.properties
cp server.properties server_9094.properties
#9092的id为0, 9093的id为1, 9094的id为2
broker.id=0
# 配置服务端的监控地址, 分别在不通的配置文件中写入不同的端口
listeners=PLAINTEXT://192.168.51.128:9092
# kafka 日志目录, 目录也是对应不同的端口
log.dirs=/data/servers/kafka_2.11-2.4.0/logs/9092
# kafka设置的partitons的个数
num.partitions=1
# ZooKeeper 的连接地址, 如果有自己的 ZooKeeper 集群, 请直接使用自己搭建的 ZooKeeper 集群
zookeeper.connect=192.168.51.128:2181
dataDir=/data/servers/zookeeper
server.1=192.168.51.128:2888:3888
echo '1'> /data/servers/zookeeper/myid
cd /data/servers/kafka_2.11-2.4.0/bin
zookeeper-server-start.sh -daemon ../config/zookeeper.properties
netstat -anp |grep 2181
./kafka-server-start.sh -daemon ../config/server_9092.properties
./kafka-server-start.sh -daemon ../config/server_9093.properties
./kafka-server-start.sh -daemon ../config/server_9094.properties
--create:创建 topic
--delete:删除 topic
--alter:修改 topic 的名字或者 partition 个数
--list:查看 topic
--describe:查看 topic 的详细信息
--topic <String: topic>:指定 topic 的名字
--zookeeper <String: hosts>:指定 ZooKeeper 的连接地址参数提示并不赞成这样使用(DEPRECATED, The connection string for the zookeeper connection in the form host:port. Multiple hosts can be given to allow fail-over.)
--bootstrap-server <String: server to connect to>:指定 Kafka 的连接地址,推荐使用这个,参数的提示信息显示(REQUIRED: The Kafka server to connect to. In case of providing this, a direct Zookeeper connection won't be required.)。
--replication-factor <Integer: replication factor> : 对于每个 Partiton 的备份个数。(The replication factor for each partition in the topic being created. If not supplied, defaults to the cluster default.)
--partitions <Integer: # of partitions>:指定该 topic 的分区的个数。
cd /data/servers/kafka_2.11-2.4.0/bin
# 创建topic test1
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test1
# 创建topic test2
kafka-topics.sh --create --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094 --replication-factor 1 --partitions 1 --topic test2
# 查看topic
kafka-topics.sh --list --bootstrap-server=192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
auto.create.topics.enable=true
delete.topic.enable=true
--topic <String: topic>:指定 topic
--timeout <Integer: timeout_ms>:超时时间
--sync:异步发送消息
--broker-list <String: broker-list>:官网提示: REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.
kafka-console-producer.sh --broker-list 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1
--topic <String: topic>:指定 topic
--group <String: consumer group id>:指定消费者组
--from-beginning:指定从开始进行消费, 如果不指定, 就从当前进行消费
--bootstrap-server:Kafka 的连接地址
kafka-console-consumer.sh --bootstrap-server 192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094 --topic test1 ---beginning
第一种日志是我们的 Kafka 的启动日志,就是我们排查问题,查看报错信息的日志。
第二种日志就是我们的数据日志,Kafka 是我们的数据是以日志的形式存在存盘中的,我们第二种所说的日志就是我们的 Partiton 与 Segment。
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
/**
* @ClassName MyKafkaProducer
* @Description TODO
* @Author lingxiangxiang
* @Date 3:37 PM
* @Version 1.0
**/
public class MyKafkaProducer {
private org.apache.kafka.clients.producer.KafkaProducer<Integer, String> producer;
public MyKafkaProducer() {
Properties properties = new Properties();
properties.put('bootstrap.servers', '192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094');
properties.put('key.serializer', 'org.apache.kafka.common.serialization.IntegerSerializer');
properties.put('value.serializer', 'org.apache.kafka.common.serialization.StringSerializer');
// 设置批量发送
properties.put('batch.size', 16384);
// 批量发送的等待时间50ms, 超过50ms, 不足批量大小也发送
properties.put('linger.ms', 50);
this.producer = new org.apache.kafka.clients.producer.KafkaProducer<Integer, String>(properties);
}
public boolean sendMsg() {
boolean result = true;
try {
// 正常发送, test2是topic, 0代表的是分区, 1代表的是key, hello world是发送的消息内容
final ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>('test2', 0, 1, 'hello world');
producer.send(record);
// 有回调函数的调用
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(recordMetadata.topic());
System.out.println(recordMetadata.partition());
System.out.println(recordMetadata.offset());
}
});
// 自己定义一个类
producer.send(record, new MyCallback(record));
} catch (Exception e) {
result = false;
}
return result;
}
}
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* @ClassName MyCallback
* @Description TODO
* @Author lingxiangxiang
* @Date 3:51 PM
* @Version 1.0
**/
public class MyCallback implements Callback {
private Object msg;
public MyCallback(Object msg) {
this.msg = msg;
}
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
System.out.println('topic = ' + metadata.topic());
System.out.println('partiton = ' + metadata.partition());
System.out.println('offset = ' + metadata.offset());
System.out.println(msg);
}
}
import static java.lang.Thread.sleep;
/**
* @ClassName MyKafkaProducerTest
* @Description TODO
* @Author lingxiangxiang
* @Date 3:46 PM
* @Version 1.0
**/
public class MyKafkaProducerTest {
public static void main(String[] args) throws InterruptedException {
MyKafkaProducer producer = new MyKafkaProducer();
boolean result = producer.sendMsg();
System.out.println('send msg ' + result);
sleep(1000);
}
}
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
* @ClassName MyKafkaConsumer
* @Description TODO
* @Author lingxiangxiang
* @Date 4:12 PM
* @Version 1.0
**/
public class MyKafkaConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public MyKafkaConsumer() {
super('KafkaConsumerTest', false);
Properties properties = new Properties();
properties.put('bootstrap.servers', '192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094');
properties.put('group.id', 'mygroup');
properties.put('enable.auto.commit', 'true');
properties.put('auto.commit.interval.ms', '1000');
properties.put('session.timeout.ms', '30000');
properties.put('heartbeat.interval.ms', '10000');
properties.put('auto.offset.reset', 'earliest');
properties.put('key.deserializer', 'org.apache.kafka.common.serialization.IntegerDeserializer');
properties.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
consumer.subscribe(Arrays.asList('test2'));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println('topic = ' + record.topic());
System.out.println('partition = ' + record.partition());
System.out.println('key = ' + record.key());
System.out.println('value = ' + record.value());
}
}
}
/**
* @ClassName MyConsumerTest
* @Description TODO
* @Author lingxiangxiang
* @Date 4:23 PM
* @Version 1.0
**/
public class MyConsumerTest {
public static void main(String[] args) {
MyKafkaConsumer consumer = new MyKafkaConsumer();
consumer.start();
System.out.println('==================');
}
}
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
* @ClassName MyKafkaConsumer
* @Description TODO
* @Author lingxiangxiang
* @Date 4:12 PM
* @Version 1.0
**/
public class MyKafkaConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public MyKafkaConsumer() {
super('KafkaConsumerTest', false);
Properties properties = new Properties();
properties.put('bootstrap.servers', '192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094');
properties.put('group.id', 'mygroup');
// 这里要修改成手动提交
properties.put('enable.auto.commit', 'false');
// properties.put('auto.commit.interval.ms', '1000');
properties.put('session.timeout.ms', '30000');
properties.put('heartbeat.interval.ms', '10000');
properties.put('auto.offset.reset', 'earliest');
properties.put('key.deserializer', 'org.apache.kafka.common.serialization.IntegerDeserializer');
properties.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
consumer.subscribe(Arrays.asList('test2'));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println('topic = ' + record.topic());
System.out.println('partition = ' + record.partition());
System.out.println('key = ' + record.key());
System.out.println('value = ' + record.value());
//手动同步提交
consumer.commitSync();
}
}
}
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Properties;
/**
* @ClassName MyKafkaConsumer
* @Description TODO
* @Author lingxiangxiang
* @Date 4:12 PM
* @Version 1.0
**/
public class MyKafkaConsumer extends ShutdownableThread {
private KafkaConsumer<Integer, String> consumer;
public MyKafkaConsumer() {
super('KafkaConsumerTest', false);
Properties properties = new Properties();
properties.put('bootstrap.servers', '192.168.51.128:9092,192.168.51.128:9093,192.168.51.128:9094');
properties.put('group.id', 'mygroup');
// 这里要修改成手动提交
properties.put('enable.auto.commit', 'false');
// properties.put('auto.commit.interval.ms', '1000');
properties.put('session.timeout.ms', '30000');
properties.put('heartbeat.interval.ms', '10000');
properties.put('auto.offset.reset', 'earliest');
properties.put('key.deserializer', 'org.apache.kafka.common.serialization.IntegerDeserializer');
properties.put('value.deserializer', 'org.apache.kafka.common.serialization.StringDeserializer');
this.consumer = new KafkaConsumer<Integer, String>(properties);
}
@Override
public void doWork() {
consumer.subscribe(Arrays.asList('test2'));
ConsumerRecords<Integer, String>records = consumer.poll(1000);
for (ConsumerRecord record : records) {
System.out.println('topic = ' + record.topic());
System.out.println('partition = ' + record.partition());
System.out.println('key = ' + record.key());
System.out.println('value = ' + record.value());
//手动同步提交
// consumer.commitSync();
//手动异步提交
// consumer.commitAsync();
// 带回调公共的手动异步提交
consumer.commitAsync((offsets, e) -> {
if(e != null) {
System.out.println('提交次数, offsets = ' + offsets);
System.out.println('exception = ' + e);
}
});
}
}
}
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.1</version>
</dependency>
spring.kafka.bootstrap-servers = 192.168.51.128:9092,10.231.128.96:9093,192.168.51.128:9094
spring.kafka.producer.acks = 0
spring.kafka.producer.key-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer = org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.retries = 3
spring.kafka.producer.batch-size = 4096
spring.kafka.producer.buffer-memory = 33554432
spring.kafka.producer.compression-type = gzip
spring.kafka.consumer.group-id = mygroup
spring.kafka.consumer.auto-commit-interval = 5000
spring.kafka.consumer.heartbeat-interval = 3000
spring.kafka.consumer.key-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.auto-offset-reset = earliest
spring.kafka.consumer.enable-auto-commit = true
# listenner, 标识消费者监听的个数
spring.kafka.listener.concurrency = 8
# topic的名字
kafka.topic1 = topic1
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
@Service
@Slf4j
public class MyKafkaProducerServiceImpl implements MyKafkaProducerService {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
// 读取配置文件
@Value('${kafka.topic1}')
private String topic;
@Override
public void sendKafka() {
kafkaTemplate.send(topic, 'hell world');
}
}
@Component
@Slf4j
public class MyKafkaConsumer {
@KafkaListener(topics = '${kafka.topic1}')
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
log.info('----------------- record =' + record);
log.info('------------------ message =' + kafkaMessage.get());
}