Kafka 流程和API

11/2/2022 Kafka

# Kafka 流程和API

Kafka是一款消息中间件,消息中间件本质就是收消息与发消息,所以这节课我们会从一条消息开始生产出发,去了解生产端的运行流程,然后简单的了解一下broker的存储流程,最后这条消息是如何被消费者消费掉的。其中最核心的有以下内容。

1、Kafka客户端是如何去设计一个非常优秀的生产级的保证高吞吐的一个缓冲机制

2、消费端的原理:每个消费组的群主如何选择,消费组的群组协调器如何选择,分区分配的方法,分布式消费的实现机制,拉取消息的原理,offset提交的原理。

# 1. Kafka一条消息发送和消费的流程(非集群)

# 2. API(Kafka客户端) 使用

先创建工程引入依赖包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.3.1</version>
</dependency>
1
2
3
4
5

# 2.1 简单入门案例 - 生产者

# 2.1.1 生产者

先创建一个主题,推荐在消息发送时创建对应的主题。当然就算没有创建主题,Kafka也能自动创建。

auto.create.topics.enable

是否允许自动创建主题。如果设为true,那么produce(生产者往主题写消息),consume(消费者从主题读消息)或者fetch metadata(任意客户端向主题发送元数据请求时)一个不存在的主题时,就会自动创建。缺省为true。

num.partitions

每个新建主题的分区个数(分区个数只能增加,不能减少 )。这个参数默认值是1(最新版本)

# 2.1.2 必选属性

创建生产者对象时有三个属性必须指定。

bootstrap.servers

该属性指定broker的地址清单,地址的格式为host:port。

清单里不需要包含所有的broker地址,生产者会从给定的broker里查询其他broker的信息。不过最少提供2个broker的信息(用逗号分隔,比如:127.0.0.1:9092,192.168.0.13:9092),一旦其中一个宕机,生产者仍能连接到集群上。

key.serializer

生产者接口允许使用参数化类型,可以把Java对象作为键和值传broker,但是broker希望收到的消息的键和值都是字节数组,所以,必须提供将对象序列化成字节数组的序列化器。

key.serializer必须设置为实现org.apache.kafka.common.serialization.Serializer的接口类

Kafka的客户端默认提供了ByteArraySerializer,IntegerSerializer,StringSerializer,也可以实现自定义的序列化器。

value.serializer

同 key.serializer。

# 2.1.3 日志输出配置

kafka日志输出内容较少,同时错误信息日志也较少,不方便排查相关问题,可以通过配置日志,可以打印详细信息便于查看

依赖

<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.7.25</version>
</dependency>

<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.25</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

resources下添加log4j.properties文件

log4j.rootLogger = info,console

log4j.appender.console = org.apache.log4j.ConsoleAppender
log4j.appender.console.Target = System.out
log4j.appender.console.layout = org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern =  %p %d{yyyy-MM-dd HH:mm:ss} %c - %m%n
1
2
3
4
5
6

# 2.1.4 三种发送方式

我们通过生成者的send方法进行发送。send方法会返回一个包含RecordMetadata的Future对象。RecordMetadata里包含了目标主题,分区信息和消息的偏移量。

发送并忘记

忽略send方法的返回值,不做任何处理。大多数情况下,消息会正常到达,而且生产者会自动重试,但有时会丢失消息。

public static void main(String[] args) {

    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", StringSerializer.class);

    // 构建kafka生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        // 构建消息
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test", "key","hello");
        // 发送消息
        producer.send(record);

        System.out.println("message is sent.");
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 释放链接
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

同步发送

public static void main(String[] args) {

    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", StringSerializer.class);

    // 构建kafka生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        // 构建消息
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test", "key","hello");
        // 发送消息
        Future<RecordMetadata> future =producer.send(record);
        RecordMetadata recordMetadata = future.get();
        if(null!=recordMetadata){
            System.out.println("offset:"+recordMetadata.offset()+","
                               +"partition:"+recordMetadata.partition());
        }

        System.out.println("message is sent.");
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 释放链接
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

异步发送

实现接口org.apache.kafka.clients.producer.Callback,然后将实现类的实例作为参数传递给send方法。

public static void main(String[] args) {

    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", StringSerializer.class);

    // 构建kafka生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        // 构建消息
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test", "key","hello");
        // 发送消息
        // 发送消息
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null){
                    // 没有异常,输出信息到控制台
                    System.out.println("offset:"+recordMetadata.offset()+"," +"partition:"+recordMetadata.partition());
                } else {
                    // 出现异常打印
                    e.printStackTrace();
                }
            }
        });

        System.out.println("message is sent.");
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 释放链接
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

# 2.2 简单入门案例 - 消费者

# 2.2.1 消费者

消费者的含义,同一般消息中间件中消费者的概念。在高并发的情况下,生产者产生消息的速度是远大于消费者消费的速度,单个消费者很可能会负担不起,此时有必要对消费者进行横向伸缩,于是我们可以使用多个消费者从同一个主题读取消息,对消息进行分流。

# 2.2.2 必选属性

创建消费者对象时一般有四个属性必须指定。

bootstrap.servers、value.Deserializer key.Deserializer 含义同生产者

# 2.2.3 可选属性

group.id 并非完全必需,它指定了消费者属于哪一个群组,但是创建不属于任何一个群组的消费者并没有问题。不过绝大部分情况我们都会使用群组消费

# 2.2.4 消费者群组

Kafka里消费者从属于消费者群组,一个群组里的消费者订阅的都是同一个主题,每个消费者接收主题一部分分区的消息。

如上图,主题T有4个分区,群组中只有一个消费者,则该消费者将收到主题T1全部4个分区的消息。

如上图,在群组中增加一个消费者2,那么每个消费者将分别从两个分区接收消息,上图中就表现为消费者1接收分区1和分区3的消息,消费者2接收分区2和分区4的消息。

如上图,在群组中有4个消费者,那么每个消费者将分别从1个分区接收消息。

但是,当我们增加更多的消费者,超过了主题的分区数量,就会有一部分的消费者被闲置,不会接收到任何消息。

往消费者群组里增加消费者是进行横向伸缩能力的主要方式。所以我们有必要为主题设定合适规模的分区,在负载均衡的时候可以加入更多的消费者。但是要记住,一个群组里消费者数量超过了主题的分区数量,多出来的消费者是没有用处的。

# 3. 序列化

创建生产者对象必须指定序列化器,默认的序列化器并不能满足我们所有的场景。我们完全可以自定义序列化器。只要实现org.apache.kafka.common.serialization.Serializer接口即可。

# 3.1 自定义序列化

创建用户对象

并实现 Serializable接口

public class User implements Serializable {

    private String name;

    private Integer age;

    @Override
    public String toString() {
        return "User{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public User(String name, Integer age) {
        this.name = name;
        this.age = age;
    }

    public User() {
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

序列化

public class ObjectSerializer implements Serializer<Object> {
    @Override
    public void configure(Map<String, ?> map, boolean b) {
        System.out.println("configure");
    }

    @Override
    public byte[] serialize(String topic, Object data) {
        return SerializationUtils.serialize((Serializable) data);
    }

    @Override
    public void close() {
        System.out.println("close");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

反序列化

public class ObjectDeserializer implements Deserializer<Object> {
    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        System.out.println("configure");
    }

    @Override
    public Object deserialize(String topic, byte[] data) {
        return SerializationUtils.deserialize(data);
    }

    @Override
    public void close() {
        System.out.println("close");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

生产和消费

// 生产
public static void main(String[] args) {

    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", ObjectSerializer.class);

    // 构建kafka生产者对象
    KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
    try {
        // 构建消息
        ProducerRecord<String,User> record = new ProducerRecord<String,User>("test", "key",new User("张三",12));
        // 发送消息
        producer.send(record);

        System.out.println("message is sent.");
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 释放链接
        producer.close();
    }
}

// 消费
public static void main(String[] args) {
    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.deserializer", StringDeserializer.class);
    properties.put("value.deserializer", ObjectDeserializer.class);
    //properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
    properties.put("group.id","group05");

    // 构建kafka生产者对象
    KafkaConsumer<String, User> consumer = new KafkaConsumer<>(properties);
    try {
        // 订阅主题
        consumer.subscribe(Collections.singleton("test"));
        while (true) {
            ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1L));
            for (ConsumerRecord<String, User> record : records) {
                String key = record.key();
                String topic = record.topic();
                int partition = record.partition();
                User value = record.value();
                long offset = record.offset();
                System.out.println("record begin =========");
                System.out.println("topic: " + topic);
                System.out.println("partition: " + partition);
                System.out.println("offset: " + offset);
                System.out.println("key: " + key);
                System.out.println("value: " + value);
                System.out.println("record end   =========");
            }
        }

    }finally {
        // 释放链接
        consumer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70

# 4. 分区

因为在Kafka中一个topic可以有多个partition,所以当一个生产发送消息,这条消息应该发送到哪个partition,这个过程就叫做分区。

当然,我们在新建消息的时候,我们可以指定partition,只要指定partition,那么分区器的策略则失效。

# 4.1 系统分区器

在我们的代码中可以看到,生产者参数中是可以选择分区器的。

# 4.1.1 DefaultPartitioner 默认分区策略

全路径类名:org.apache.kafka.clients.producer.internals.DefaultPartitioner

  • 如果消息中指定了分区,则使用它
  • 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模。
  • 如果不存在分区或key,则会使用粘性分区策略

采用默认分区的方式,键的主要用途有两个:

一,用来决定消息被写往主题的哪个分区,拥有相同键的消息将被写往同一个分区。

二,还可以作为消息的附加消息。

# 4.1.2 RoundRobinPartitioner 分区策略

全路径类名:org.apache.kafka.clients.producer.internals.RoundRobinPartitioner

  • 如果消息中指定了分区,则使用它
  • 将消息平均的分配到每个分区中。

即key为null,那么这个时候一般也会采用RoundRobinPartitioner

# 4.1.3 UniformStickyPartitioner 纯粹的粘性分区策略

他跟DefaultPartitioner 分区策略的唯一区别就是。

DefaultPartitionerd 如果有key的话,那么它是按照key来决定分区的,这个时候并不会使用粘性分区 UniformStickyPartitioner 是不管你有没有key, 统一都用粘性分区来分配

另外关于粘性分区策略

从客户端最新的版本上来看(3.3.1),有两个序列化器已经进入 弃用阶段。

但是这个客户端在3.1.0都还不是这样。关于粘性分区策略

如果感兴趣可以看下这篇文章

[https://bbs.huaweicloud.com/blogs/348729?utm_source=oschina&utm_medium=bbs-ex&utm_campaign=other&utm_content=content](

# 4.2 自定义分区器

我们完全可以去实现Partitioner接口,去实现有一个自定义的分区器

自定义分区类

public class UserDefinedPartitioner implements Partitioner {
    private AtomicInteger atomicInteger=new AtomicInteger(0);
    @Override
    public int partition(String topic, Object o, byte[] keyBytes, Object o1, byte[] bytes1, Cluster cluster) {
        //自定义分区规则
        int numPartitions = cluster.partitionsForTopic(topic).size();
        if(keyBytes==null || keyBytes.length==0){
            return atomicInteger.addAndGet(1) & Integer.MAX_VALUE% numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {
        System.out.println("close");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("============================== Configure ==============================");
        for (String s : map.keySet()) {
            System.out.println(s + " " + map.get(s));
        }
        System.out.println("============================== End ==============================");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

生产者配置自定义分区

public static void main(String[] args) {

    // 设置属性
    Properties properties = new Properties();
    // 指定连接的kafka服务器的地址  key值可以用 ProducerConfig类中的静态属性配置 ,
    // 多个服务器逗号隔开
    properties.put("bootstrap.servers","192.168.220.201:9092");
    // 设置String的序列化
    properties.put("key.serializer", StringSerializer.class);
    properties.put("value.serializer", StringSerializer.class);
    //设置自定义分区
    properties.put("partitioner.class",UserDefinedPartitioner.class);

    // 构建kafka生产者对象
    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
    try {
        // 构建消息
        ProducerRecord<String,String> record = new ProducerRecord<String,String>("test",0, "key","hello");
        // 发送消息
        producer.send(record);

        System.out.println("message is sent.");
    }catch (Exception e) {
        e.printStackTrace();
    }finally {
        // 释放链接
        producer.close();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

# 5. 生产缓冲机制

客户端发送消息给kafka服务器的时候、消息会先写入一个内存缓冲中,然后直到多条消息组成了一个Batch,才会一次网络通信把Batch发送过去。主要有以下参数:

buffer.memory

设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。如果数据产生速度大于向broker发送的速度,导致生产者空间不足,producer会阻塞或者抛出异常。缺省33554432 (32M)

buffer.memory: 所有缓存消息的总体大小超过这个数值后,就会触发把消息发往服务器。此时会忽略batch.size和linger.ms的限制。 buffer.memory的默认数值是32 MB,对于单个 Producer 来说,可以保证足够的性能。 需要注意的是,如果您在同一个JVM中启动多个 Producer,那么每个 Producer 都有可能占用 32 MB缓存空间,此时便有可能触发 OOM。

batch.size

当多个消息被发送同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。当批次内存被填满后,批次里的所有消息会被发送出去。但是生产者不一定都会等到批次被填满才发送,半满甚至只包含一个消息的批次也有可能被发送(linger.ms控制)。缺省16384(16k) ,如果一条消息超过了批次的大小,会写不进去。

linger.ms

指定了生产者在发送批次前等待更多消息加入批次的时间。它和batch.size以先到者为先。也就是说,一旦我们获得消息的数量够batch.size的数量了,他将会立即发送而不顾这项设置,然而如果我们获得消息字节数比batch.size设置要小的多,我们需要“linger”特定的时间以获取更多的消息。这个设置默认为0,即没有延迟。设定linger.ms=5,例如,将会减少请求数目,但是同时会增加5ms的延迟,但也会提升消息的吞吐量。

# 5.1 为何要设计缓冲机制

1、减少IO的开销(单个 ->批次)但是这种情况基本上也只是linger.ms配置>0的情况下才会有,因为默认inger.ms=0的,所以基本上有消息进来了就发送了,跟单条发送是差不多!!

2、减少Kafka中Java客户端的GC。

比如缓冲池大小是32MB。然后把32MB划分为N多个内存块,比如说一个内存块是16KB(batch.size),这样的话这个缓冲池里就会有很多的内存块。

你需要创建一个新的Batch,就从缓冲池里取一个16KB的内存块就可以了,然后这个Batch就不断的写入消息

下次别人再要构建一个Batch的时候,再次使用缓冲池里的内存块就好了。这样就可以利用有限的内存,对他不停的反复重复的利用。因为如果你的Batch使用完了以后是把内存块还回到缓冲池中去,那么就不涉及到垃圾回收了。

# 6. 消费者偏移量提交

一般情况下,我们调用poll方法的时候,broker返回的是生产者写入Kafka同时kafka的消费者提交偏移量,这样可以确保消费者消息消费不丢失也不重复,所以一般情况下Kafka提供的原生的消费者是安全的,但是事情会这么完美吗?

# 6.1 自动提交

最简单的提交方式是让消费者自动提交偏移量。 如果enable.auto.commit被设为 true,消费者会自动把从poll()方法接收到的最大偏移量提交上去。提交时间间隔由auto.commit.interval.ms控制,默认值是5s。

自动提交是在轮询里进行的,消费者每次在进行轮询时会检査是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

不过,在使用这种简便的方式之前,需要知道它将会带来怎样的结果。

假设我们仍然使用默认的5s提交时间间隔, 在最近一次提交之后的3s发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量, 减小可能出现重复消息的时间窗, 不过这种情况是无法完全避免的。

在使用自动提交时,每次调用轮询方法都会把上一次调用返回的最大偏移量提交上去,它并不知道具体哪些消息已经被处理了,所以在再次调用之前最好确保所有当前调用返回的消息都已经处理完毕(enable.auto.comnit被设为 true时,在调用 close()方法之前也会进行自动提交)。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

# 6.2 手动提交

参数配置

//取消自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
1
2

手动提交代码

手动提交方式有同步提交和异步提交,这里异步提交

/**
    * key:TopicPartition topic + 分区
     * value:OffsetAndMetadata offset偏移量信息
*/

Map<TopicPartition, OffsetAndMetadata> offsets=new HashMap<TopicPartition, OffsetAndMetadata>();

/**
        * 注意:提交的位置为下一个消费的位置
   */
offsets.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset() + 1));

consumer.commitAsync(offsets,new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
        System.out.println("提交成功!");
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 6.3 消费者的配置参数

earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 none topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常

只要group.Id不变,不管auto.offset.reset 设置成什么值,都从上一次的消费结束的地方开始消费。

从头开始消费: consumer.group.id 换成新的; auto.offset.reset 设置成earliest

从尾开始消费 consumer.group.id 换成新的; auto.offset.reset 设置成latest

# 7. 自定义拦截器

# 7.1 创建自定义拦截器‘

public class UserDefinedProducerInterceptor implements ProducerInterceptor {
    @Override
    public ProducerRecord onSend(ProducerRecord record) {
        ProducerRecord producerRecord = new ProducerRecord(record.topic(), record.key(), record.value() + "782099197@qq.com");
        return producerRecord;
    }

    /**
     * 无论是正确还是异常都会进入该方法
     * @param recordMetadata
     * @param e
     */
    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        System.out.println("metadata:"+recordMetadata+",exception:"+e);
    }

    @Override
    public void close() {
        System.out.println("close");
    }

    @Override
    public void configure(Map<String, ?> map) {
        System.out.println("configure");
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

# 7.2 生产者配置消费者

props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefinedProducerInterceptor.class);
1

# 7.3 7.2 消费者配置消费者

props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,UserDefinedProducerInterceptor.class);
1

# 8. ACK和Retries(存在数据重复问题)

Kafka生产者在发送完一个的消息之后,要求Broker在规定的额时间Ack应答答,如果没有在规定时间内应答,Kafka生产者会尝试n次重新发送消息。

acks=1 默认

  • acks=1 - Leader会将Record写到其本地日志中,但会在不等待所有Follower的完全确认的情况下做出响应。在这种情况下,如果Leader在确认记录后立即失败,但在Follower复制记录之前失败,则记录将丢失。
  • acks=0 - 生产者根本不会等待服务器的任何确认。该记录将立即添加到套接字缓冲区中并视为已发送。在这种情况下,不能保证服务器已收到记录。
  • acks=all - 这意味着Leader将等待全套同步副本确认记录。这保证了只要至少一个同步副本仍处于活动状态,记录就不会丢失。这是最有力的保证。这等效于acks = -1设置。

如果生产者在规定的时间内,并没有得到Kafka的Leader的Ack应答,Kafka可以开启reties机制。

request.timeout.ms = 30000 默认 retries = 2147483647 默认

数据重复问题

# 9. 幂等性

Kafka在0.11.0.0版本支持增加了对幂等的支持。幂等是针对生产者角度的特性。幂等可以保证上生产者发送的消息,不会丢失,而且不会重复。实现幂等的关键点就是服务端可以区分请求是否重复,过滤掉重复的请求。要区分请求是否重复的有两点:

唯一标识:要想区分请求是否重复,请求中就得有唯一标识。例如支付请求中,订单号就是唯一标识

记录下已处理过的请求标识:光有唯一标识还不够,还需要记录下那些请求是已经处理过的,这样当收到新的请求时,用新请求中的标识和处理记录进行比较,如果处理记录中有相同的标识,说明是重复记录,拒绝掉。

幂等又称为exactly once。要停止多次处理消息,必须仅将其持久化到Kafka Topic中仅仅一次。在初始化期间,kafka会给生产者生成一个唯一的ID称为Producer ID或PID。

PID和序列号与消息捆绑在一起,然后发送给Broker。由于序列号从零开始并且单调递增,因此,仅当消息的序列号比该PID / TopicPartition对中最后提交的消息正好大1时,Broker才会接受该消息。如果不是这种情况,则Broker认定是生产者重新发送该消息。

enable.idempotence= false 默认

注意:在使用幂等性的时候,要求必须开启retries=true和acks=all

# 9.1 生产者幂等性配置

// ACKS 设置为all
props.put(ProducerConfig.ACKS_CONFIG,"all");
//尝试次数设置3 一共四次
props.put(ProducerConfig.RETRIES_CONFIG,3);
//问答时间设置为1ms(测试)
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,1);

//开启kafka幂等性
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
//设置阻塞等待数量 设置为 1 确保顺序
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);
1
2
3
4
5
6
7
8
9
10
11

# 10. 事务控制

Kafka的幂等性,只能保证一条记录的在分区发送的原子性,但是如果要保证多条记录(多分区)之间的完整性,这个时候就需要开启kafka的事务操作。

在Kafka0.11.0.0除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为 生产者事务Only、消费者&生产者事务。一般来说默认消费者消费的消息的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。

isolation.level = read_uncommitted 默认

该选项有两个值read_committed|read_uncommitted,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed

开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一个"transactional.id"存储在,其他的将会被关闭。

# 10.1 生产者事务

public class Producer {

    KafkaProducer<String, String> producer = null;

    /**
     * 创建kafka客户端
     */
    @Before
    public void before() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,node1:9092,node2:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // ACKS 设置为all
        props.put(ProducerConfig.ACKS_CONFIG,"all");
        //尝试次数设置3 一共四次(可以去掉使用默认)
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //问答时间设置为1ms(测试)
        props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG,20000);

        //开启kafka幂等性
        props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
        //设置阻塞等待数量 设置为 1 确保顺序
        props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1);

        //配置kafka批处理大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG,1024);
        //等待5ms如果batch中的数据不足1024大小
        props.put(ProducerConfig.LINGER_MS_CONFIG,5);

        //必须配置事务ID,必须是唯一的
        props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction-id " + UUID.randomUUID().toString());

        producer = new KafkaProducer<String, String>(props);
    }

    /**
     * 释放资源
     */
    @After
    public void after() {
        if(null != producer) producer.close();
    }

    @Test
    public void producer() throws InterruptedException {
        producer.initTransactions();
        try {
            producer.beginTransaction();
            for(int i = 0 ; i < 5 ; i++) {
                if(i == 3) throw new RuntimeException("出错啦~~");
                ProducerRecord<String,String> producerRecord = new ProducerRecord<String,String>("topic02","acks" + 1,"test acks");
                producer.send(producerRecord);
                producer.flush();
            }
            producer.commitTransaction();
        }catch (Exception e) {
            System.err.println("出现错误");
            producer.abortTransaction();
        }
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

# 10.2 消费者

配置

props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_committed");
1
Last Updated: 3/3/2023, 8:32:40 PM