螺竹编程
发布于 2024-06-01 / 2 阅读
0

Kafka/核心概念:消费者

介绍

Kafka是一个分布式的消息系统,消费者是Kafka中的一个重要组件之一。消费者可以订阅一个或者多个主题(topic),并从主题中消费消息(message)。Kafka的消费者采用发布-订阅模式,可以通过多个消费者并发地消费同一个主题中的消息,每个消费者消费的消息都是独立的,不会相互影响。

在Kafka中,消费者基于消费者组(consumer group)进行组织。一个消费者组可以包含多个消费者实例,每个消费者实例可以消费一个或多个分区(partition)中的消息。一个分区只能由同一消费者组中的一个消费者实例消费,一个消费者组中的多个消费者实例可以同时消费多个分区。

示例

下面是一个简单的Java示例,演示如何使用Kafka的消费者API从一个主题中消费消息:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class ConsumerExample {
    public static void main(String[] args) {
        // 配置消费者属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group");
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());

        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // 订阅主题
        consumer.subscribe(Collections.singletonList("test-topic"));

        // 消费消息
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("消费者收到消息,主题:%s,分区:%d,偏移量:%d,内容:%s%n",
                        record.topic(), record.partition(), record.offset(), record.value());
            }
        }
    }
}

在上面的示例中,我们首先配置了消费者的属性,包括Kafka集群的地址、消费者组的ID、键值反序列化器等。然后创建了一个KafkaConsumer实例,并订阅了一个主题。最后在一个while循环中,使用poll方法从Kafka中拉取消息,然后遍历消息列表,逐条处理每条消息。