侧边栏壁纸
博主头像
小武哥博主等级

专注做好每一件小事。

  • 累计撰写 45 篇文章
  • 累计创建 58 个标签
  • 累计收到 3 条评论

Kafka常见客户端与服务端参数配置大全

小武哥
2021-04-18 / 0 评论 / 0 点赞 / 400 阅读 / 2,651 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2021-05-14,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

服务端参数配置

在kafka服务端的server.properties中的配置

listeners=PLAINTEXT://localhost:9092 #内网使用的连接地址
advertised.listeners=PLAINTEXT://localhost:9092 #advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners
log.dirs=/tmp/kafka-logs #kafka brokers存放消息的文件路径
num.partitions=1 #默认分区大小
log.retention.hours=168 #分区中消息存放时间
log.retention.bytes=1073741824 #broker存放消息磁盘最大大小
zookeeper.connect=xx:2181,xx:2181 #zookeeper集群大小
broker.id=100 #broker的id,在集群中必须唯一
message.max.bytes=10485760 #10M broker接收最大消息大小
auto.create.topics.enable=false #禁止自动创建topic,由运维人员统一管理
unclean.leader.election.enable=false #禁止落后太多的副本竞选leader,可能会丢消息
auto.leader.rebalance.enable=false #禁止kafka定期的对一些topic的分区进行重选举

创建topic时用到的参数

retentions.ms=1000 #覆盖在server.properties中的全局存放时间
retentions.bytes=1048576 #覆盖在server.properties中全局的消息最大存放大小

kafka时scala开发的,实际最后还是运行在jvm之上,在bin/kafka-server-start.sh中的调整KAFKA_HEAP_OPTS大小

export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"

操作系统调整参数

ulimit	-n	1000000 #打开文件描述符调大

客户端参数配置

生产者

max.request.size=10485760 #最大发送消息大小
bootstrap.servers=localhost:9092 #kafka集群地址
retries=1 #发生异常重试次数
max.in.flight.requests.per.connection=1 #该参数指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量。把它设为 1 可以保证消息是按照发送的顺序写入服务器的,即使发生了重试。
acks=0|1|all #0代表不管brokers端是否接收到、1代表分区leader接收到、all代表分区leader和所有副本都接收到了,才代表发送成功
retry.backoff.ms=200 #消息重试间隔时间
key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer.class #key序列化方式
value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer.class #value序列化方式
compression.type=gzip #消息压缩算法
linger.ms=10 #一个Batch被创建之后,最多等待延迟时间,到点时间后不管这个Batch有没有写满,消息都必须发送出去
batch.size=16384 #batch.size默认值是16KB,kafka Producer凑够16KB的数据才会发送
max.request.size=1638 #客户端能够发送的最大消息大小
buffer.memory=67108864 #KafkaProducer能够使用的内存缓冲的大小的,默认值32MB,buffer.memory设置的太小,可能导致的问题是:消息快速的写入内存缓冲里,但Sender线程来不及把Request发送到Kafka服务器,会造成内存缓冲很快就被写满。而一旦被写满,就会阻塞用户线程,不让继续往Kafka写消息了

消费者

group.id=log #消费者组id
enable.auto.commit=false #禁止自动提交位移
session.timeout.ms=60000 #向消费者所消费的分区的broker的coordinator提交心跳信息最大间隔,避免被认为挂掉
heartbeat.interval.ms=100 #发送心跳时间的频率
max.poll.interval.ms=30000 #consumer两次调用poll的最大间隔,超过这个间隔则认为该消费者挂掉
auto.offset.reset=earliest|latest|none #earliest 已经消费过数据,从上次消费处接着消费。未消费从最早一条消费,lastest:已经消费过数据,从上次消费处接着消费。未消费从最新一条开始消费,none:各分区都存在已提交的offset时,从提交的offest处开始消费;只要有一个分区不存在已提交的offset,则抛出异常
max.poll.records=10 #一次poll拉取消息的个数
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer.class #key的反序化方法
value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer.class #value的反序列化方法
fetch.min|max.bytes=1024*1024*5 #一次拉取最少|最多消息大小
0

评论区