螺竹编程
发布于 2024-06-01 / 2 阅读
0

Kafka/问题:重复消费问题

介绍

在 Kafka 中,消费者组可以订阅一个或多个主题,并从这些主题中读取消息。Kafka 消费者在读取消息时,可以自动提交偏移量(offset),表示已经处理完了某个消息。但是,由于各种原因,消费者有可能会重复消费已经处理过的消息,这会导致数据的不一致性和重复处理,给业务带来严重的问题。

造成 Kafka 消费者重复消费消息的原因可能有很多,比如网络问题、程序异常退出、消费者重启等。为了避免这种情况,可以采用以下两种解决方案。无论采用哪种方式,都需要确保偏移量的正确提交,以避免重复消费消息的问题。

解决方案

手动提交偏移量

在消费者处理完消息后,手动提交偏移量,这样可以确保已经处理过的消息不会再次被消费。手动提交偏移量的代码示例如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交偏移量
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
        }
        // 手动提交偏移量
        consumer.commitSync();
    }
} finally {
    consumer.close();
}

使用消费者组管理器

Kafka 提供了一个消费者组管理器(Consumer Group Coordinator),它能够在集群中协调消费者组的活动,并管理消费者偏移量的提交。使用消费者组管理器可以确保偏移量的正确提交,避免重复消费消息。使用消费者组管理器的代码示例如下:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
        }
    }
} finally {
    consumer.close();
}