kafka生产者发送和配置

  • kafka消息发送
  • kafka消息同步发送
  • kafka消息异步发送

  • kafka生产者配置

{———-}

kafka消息发送

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 最简单发送
*/
public void test01() {
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountory", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
}

kafka消息同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14

/**
* kafka同步发送
*/
public void test02() {
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountory", "Precision Products", "France");
try {
Future result = producer.send(record);
result.get();
} catch (Exception e) {
e.printStackTrace();
}
}

kafka消息异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

/**
* kafka 异步发送
*/
public void test03(){
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountory", "Precision Products", "France");
try{
Future result = producer.send(record,new DemoProducerCallback());
result.get();
}catch(Exception e){
e.printStackTrace();
}
}

public class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {

}
}

kafka生产者配置

  • acks

    • acks = 0
      • 不回调通知
    • acks = a
      • 只保存首领节点、即返回消息确认
    • acks = all
      • 保存首领节点和所有副节点后、即返回消息确认
  • buffer.memory

    • 设置生产者内存缓存大小、生产者又来缓冲发送到服务器的消息、
    • 0.9.00版本中、被替换为max.block.ms 表示抛出异常前可以阻塞一段时间
  • compression.type

    • 消息压缩、可设置为snappy、gzip、lz4
    • 指定消息被发送给broker之前使用哪一种压缩算法进行压缩
  • retries

    • 重试次数
    • 默认每次重试等待100ms、可以通过retry.backoff.ms 设置
    • 一般首领选举中易发生重试、建议该时间大于选举时间、崩溃恢复时间长等
  • batch.size

    • 消息批次大小、累积到一定大小发送消息
  • linger.ms

    • 指定生产者在发送批次之前等待更多消息加入批次时间
    • 消息发送两个控制点、一个是累计大小、一个是这个时间、规定时间内没有达到累积值、也要发送消息
  • client.id

    • 可以设置任意字符串
    • 服务器用来判断消息来源、用来标记消息
  • max.in.flight.requests.pre.connection

    • 指定、生产者在受到服务器响应之前可以发送多少个消息、
    • 值越高、就会占用更多内存、也会提升吞吐量、
    • 设置为1 可保证消息桉顺序写入broker服务器
  • timeout.ms、request.timeout.ms、metadata.fetch.timeout.ms

    • timeout.ms 生产者发送数据时等待服务器返回响应时间
    • request.timeout.ms 指定生产者在获取元数据(比如broker首领是谁)等待服务器返回响应时间、超时重试
    • metadata.fetch.timeout.ms 指定 broker等待同步副本返回消息确认时间、超时认为失败
  • max.block.ms

    • 指定调用send方法或者使用partitionFor()方法获取元数据生产者的阻塞时间
    • 超时抛出异常
  • max.request.size

    • 控制生产者发送消息大小、单个请求大小、单个请求中一个批次消息的总大小
  • receive.buffer.bytes、send.buffer.bytes

    • tcp、socket的接收和发送缓冲区大小
  • kafka顺序
    • 可设置max.in.flight.request.pre.connection = 1、但会严重影响吞吐量
    • 这点或许rabbitmq和rocketmq做的更好