偏移量
偏移量
介绍
在Kafka中,偏移量(Offset)是指一个消费者组(Consumer Group)在Kafka主题(Topic)中已经消费的消息的位置。消费者组中的每个消费者都会记录其消费的偏移量,Kafka会将偏移量保存在特定的主题中,以便在消费者重新连接到主题时,能够从离开的地方继续消费消息,而不会重复消费或者错过消息。
偏移量在Kafka中是用一个64位整数表示的,每个主题分区(Partition)都有自己独立的偏移量。
示例
下面是一个简单的Java例子,演示如何使用Kafka Java客户端来获取并提交偏移量。假设我们有一个名为"MyTopic"的主题,其中包含两个分区,我们的消费者组名称为"MyGroup",并且我们想要从最早的可用偏移量开始消费消息。
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import java.util.*;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "MyGroup");
properties.setProperty("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("MyTopic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1);
Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(topicPartition, offsetAndMetadata);
consumer.commitSync(offsets);
}
}
} finally {
consumer.close();
}
}
}
在上面的例子中,我们首先创建了一个Kafka消费者,然后订阅"MyTopic"主题。我们通过设置"enable.auto.commit"为"false"来禁用自动提交偏移量。这样我们就需要手动提交偏移量。
然后我们进入一个无限循环,在每次循环中,我们调用consumer.poll方法来获取新的消息,然后遍历消息记录,打印出消息的偏移量、键和值。接着,我们构建一个Map对象,将从主题分区中获取到的偏移量加1,然后将其提交给Kafka服务器。这样,我们就能够确保在消费者关闭或者重新连接时,能够从正确的位置开始消费消息。