消费者
消费者
介绍
Kafka是一个分布式的消息系统,消费者是Kafka中的一个重要组件之一。消费者可以订阅一个或者多个主题(topic),并从主题中消费消息(message)。Kafka的消费者采用发布-订阅模式,可以通过多个消费者并发地消费同一个主题中的消息,每个消费者消费的消息都是独立的,不会相互影响。
在Kafka中,消费者基于消费者组(consumer group)进行组织。一个消费者组可以包含多个消费者实例,每个消费者实例可以消费一个或多个分区(partition)中的消息。一个分区只能由同一消费者组中的一个消费者实例消费,一个消费者组中的多个消费者实例可以同时消费多个分区。
示例
下面是一个简单的Java示例,演示如何使用Kafka的消费者API从一个主题中消费消息:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class ConsumerExample {
public static void main(String[] args) {
// 配置消费者属性
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
// 创建消费者实例
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("test-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消费者收到消息,主题:%s,分区:%d,偏移量:%d,内容:%s%n",
record.topic(), record.partition(), record.offset(), record.value());
}
}
}
}
在上面的示例中,我们首先配置了消费者的属性,包括Kafka集群的地址、消费者组的ID、键值反序列化器等。然后创建了一个KafkaConsumer实例,并订阅了一个主题。最后在一个while循环中,使用poll方法从Kafka中拉取消息,然后遍历消息列表,逐条处理每条消息。