kafka消费者接收和配置

  • kafka消息接收方式
  • kafka消息偏移量提交
  • kafka序列化

  • kafka消费者配置

{———-}

kafka消息接收方式

  • kafka有两种消息接收方式、1:主动拉取、2:服务器端推送
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


/**
* kafka消费者
*/
public void test04() {
Properties properties = new Properties();
properties.put("bootstrap.servers", "broker1:9002,broker2:9002");
properties.put("group.id", "CountryCounter");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);

//订阅主题
consumer.subscribe(Collections.singletonList("customerCountries"));
//订阅所有主题
consumer.subscribe(Collections.singletonList("test.*"));

try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
logger.debug("topic=%s,partition=%s,offset=%d,constomer=%s,country=%s\n",
record.topic(),record.partition(),record.offset(),record.key(),record.value());

//todo 执行消费逻辑
}
}
} catch (Exception e) {

}
}

kafka消息偏移量提交

  • 提交已处理消息标记位

kafka序列化

  • kafka使用avro序列化
  • 可以看这边文章、Apache Avro

kafka消费者配置

  • fetch.min.bytes
    • 指定消费者从服务器获取记录的最小字节数
  • fetch.max.wait.ms
    • 累积多少时间把数据返还给消费者
    • 数值大、吞吐量大、延迟高、
    • 数值小、吞吐量小、延迟低、
  • max.partition.fetch.bytes

    • 指定每个分区返给消费者的最大字节数、默认值1MB
    • 如果有20个分区、5个消费者、那么这里就可以配置4MB
    • 该值一定要大于broker能够接收数据最大值、否则会一直挂起重试
  • session.timeout.ms

    • 指定消费者在被认为死亡之前可以与服务器断开连接时间、默认3s
    • 如果规定时间内没有发送心跳、认为死亡、触发再均衡
  • auto.offset.reset

    • 指定 消费者在读取一个没有偏移量的分区或者偏移量无效情况下(因消费者长时间失效、包含偏移量的记录已经过时并被删除)该如何处理
    • 默认值 latest、最新数据
    • earliest 在偏移量无效情况下、消费者将从起始位置读取分区记录【幂等消费?】
  • enable.auto.commit

    • 指定消费者是否自动提交偏移量 默认值true
  • partition.assignment.strategy

    • 指定分配主题分区给消费者的分配策略
    • Range 轮询、如果分区数量大于消费者数量、那么排在前面的消费者会被分配到更多的分区
    • RoundRobin 给消费者分配更加平等数量的分区、或者最多小差一个分区
  • client.id

    • 分区id 标识分区唯一、以及客户端、识别消息、也被用在日志和度量指标和配额里
  • max.poll.records

    • 指定单次调用call()方法能够返回的记录数量
  • receive.buffer.bytes、send.buffer.bytes

    • socket在读写数据时用到的TCP缓冲区 -1使用系统默认值
    • 如果消费者和生产者不再统一数据中心内、可以适当增大该值

kafka消费者注意点

  • 消费者数量大于主题分区数量、否则消费者会被闲置;比如5个主题分区、4个消费者、那么会有一个主题分区会被闲置
  • 再均衡
    • 分区所有权从一个消费者转移到另一个消费者、这样的行为被称为再均衡
    • 再均衡期间、消费者不能消费消息
    • 也是这样的设置、能是消费者分区能动态增减
    • 维持:消费者通过向消费者协调者broker发送心跳、来维持他们与群组的从属关系以及他们对分区的所有权关系
    • 消费者死亡、消费协调者broker会等待几秒钟、确认死亡后会触发再均衡