螺竹编程
发布于 2024-06-01 / 4 阅读
0

Kafka/核心概念:偏移量

介绍

在Kafka中,偏移量(Offset)是指一个消费者组(Consumer Group)在Kafka主题(Topic)中已经消费的消息的位置。消费者组中的每个消费者都会记录其消费的偏移量,Kafka会将偏移量保存在特定的主题中,以便在消费者重新连接到主题时,能够从离开的地方继续消费消息,而不会重复消费或者错过消息。

偏移量在Kafka中是用一个64位整数表示的,每个主题分区(Partition)都有自己独立的偏移量。

示例

下面是一个简单的Java例子,演示如何使用Kafka Java客户端来获取并提交偏移量。假设我们有一个名为"MyTopic"的主题,其中包含两个分区,我们的消费者组名称为"MyGroup",并且我们想要从最早的可用偏移量开始消费消息。

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "MyGroup");
        properties.setProperty("enable.auto.commit", "false");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList("MyTopic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
                    Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(topicPartition, offsetAndMetadata);
                    consumer.commitSync(offsets);
                }
            }
        } finally {
            consumer.close();
        }
    }
}

在上面的例子中,我们首先创建了一个Kafka消费者,然后订阅"MyTopic"主题。我们通过设置"enable.auto.commit"为"false"来禁用自动提交偏移量。这样我们就需要手动提交偏移量。

然后我们进入一个无限循环,在每次循环中,我们调用consumer.poll方法来获取新的消息,然后遍历消息记录,打印出消息的偏移量、键和值。接着,我们构建一个Map对象,将从主题分区中获取到的偏移量加1,然后将其提交给Kafka服务器。这样,我们就能够确保在消费者关闭或者重新连接时,能够从正确的位置开始消费消息。