以下是Kafka消费者配置的核心参数及说明,关键参数需根据业务场景调整:
基础配置
bootstrap.servers:必填,Kafka集群地址列表(如host1:port1,host2:port2),建议配置3个以上节点以防单点故障。group.id:必填,消费者组ID,同组消费者共同消费分区,不同组可并行消费同一主题。key.deserializer/value.deserializer:必填,消息键值的反序列化类(如StringDeserializer、IntegerDeserializer)。
消费行为控制
auto.offset.reset:无偏移量时的消费起点,可选latest(最新消息,默认)、earliest(最早消息)、none(无偏移量则报错)。enable.auto.commit:是否自动提交偏移量,默认true。生产环境建议设为false,手动提交以保证消息不丢失。auto.commit.interval.ms:自动提交间隔(毫秒),仅在enable.auto.commit=true时生效,默认5000。
性能优化参数
fetch.min.bytes:单次拉取的最小数据量(字节),默认1,增大可减少网络请求次数,但可能增加延迟。fetch.max.bytes:单次拉取的最大数据量(字节),默认50MB,需与消费者内存匹配,避免OOM。max.poll.records:单次poll()返回的最大消息数,默认500。处理慢时调小(如100),避免触发超时Rebalance。max.poll.interval.ms:两次poll()的最大间隔时间(毫秒),默认5分钟。处理耗时长的场景需增大,避免被误判为离线。
心跳与会话管理
heartbeat.interval.ms:发送心跳的间隔(毫秒),默认3000,需小于session.timeout.ms的1/3。session.timeout.ms:消费者被视为离线的超时时间(毫秒),默认10秒。网络抖动时需调大,避免频繁Rebalance。
分区分配策略
partition.assignment.strategy:分区分配策略,默认RangeAssignor(按范围分配)。可选RoundRobinAssignor(轮询分配,均衡负载)或StickyAssignor(粘性分配,减少Rebalance开销)。
安全与连接配置
security.protocol:通信协议,默认PLAINTEXT。生产环境建议用SSL或SASL_SSL。ssl.keystore.location/ssl.truststore.location:SSL证书路径,用于双向认证。
其他关键参数
isolation.level:事务隔离级别,默认read_uncommitted(可读未提交消息)。如需严格一致性,设为read_committed。client.id:客户端标识,用于监控和日志追踪,建议设置为业务名称。
调优建议:
- 高吞吐场景:增大
fetch.max.bytes和max.poll.records,调小heartbeat.interval.ms。 - 低延迟场景:减小
fetch.min.bytes,调大session.timeout.ms。 - 关键业务:禁用自动提交(
enable.auto.commit=false),手动控制偏移量提交时机。
参数详情可参考官方文档:Kafka Consumer Configs。