You need to enable JavaScript to run this app.
导航

Kafka 消费者最佳实践

最近更新时间2024.01.15 11:10:55

首次发布时间2024.01.15 11:10:55

本文档以 Confluent 官方 Java 版本客户端 SDK 为例,介绍使用火山引擎 Kafka 实例时的消费者最佳实践。

广播与单播

在同一个消费组内部,每个消息都预期仅仅只被消费组内的某个消费者消费一次,因而使用同一个消费组的不同消费者之间,即可实现消息的单播消费。
在不同的消费组之间,每个消息都预期可以被每个消费组分别消费一次,因而使用不同消费组的不同消费者之间,即可实现消息的广播消费。

幂等性

消息是否被客户端消费,在服务端的认知中,仅和保存在服务端的消费位点有关。而消费位点是由消费者调用相关 API 从而记录到服务端,那么在客户端起停导致的重均衡过程中,很可能会出现消费位点未及时同步到服务端的现象。
因而,即使在同一个消费组内的不同消费者,也无法完全保证一条消息仅仅只会被消费一次。消费者若需要实现完全的幂等,可以通过在消息中添加额外的标识字段等方式在消费到消息后,再进行二次校验。

Topic 消费

消费者支持通过以下方式指定 Topic:

  • 订阅(Subscribe):标准的消费者使用方式,客户端封装了一套完整的消费订阅模型,包括每个消费者需要消费的分区分配、消费者加入或退出的重均衡等。
  • 自由分配(Assign):完全由业务自己指定消费者需要消费的分区信息,不同消费者之间的消费协调等都需要业务自己实现。

推荐直接使用订阅(Subscribe)的方式。

消费模型

消费者使用拉模型进行数据读取,需要保证拉取的线程不会异常退出或者被阻塞,否则会导致无法正常发起消费请求。
消费者的所有请求发送和响应几乎都基于消费者poll方法的调用。
若客户端使用订阅(Subscribe)的方式进行消费,那么在使用过程中,需要保证poll方法在固定的周期内进行调用,最长不能超过max.poll.interval.ms的配置,默认 300000ms,该参数定义了两次poll方法调用的最大时间间隔,超过该时间间隔,会导致服务端认为消费者异常,从而将其从消费组中踢出。同时过长的间隔,也可能会影响到消费组重均衡的执行,导致长时间的消费卡顿。

重均衡

客户端使用订阅(Subscribe)的方式进行消费时,在消费组的生命周期中将可能在以下不同的状态之间进行流转:

  • PreparingRebalance:消费组正在进行分区重分配。
  • CompletingRebalance:消费组完成了分区重分配的计算,并等待所有的分配结果下发到指定消费者。
  • Stable:分配结果同步到各个消费者后,消费组会进入此状态,开始进行消费处理。
  • Empty:消费组当前没有激活的消费者,也没有在进行消费。

通常一个正常的消费组预期应该长期保持在 Stable 状态进行正常的消费业务处理。
当一个订阅中的消费组有新的消费者加入或者老的消费者退出/失败时,将会触发一次消费组的重均衡动作。消费组将进入PreparingRebalance状态,然后等待当前所有的消费者重新加入消费组。所有消费组触发完成后,重新计算分区的分配并进入CompletingRebalance状态,并等待各个消费者完成各自分区的获取,进入 Stable 状态,开始正常处理消费逻辑。
从以上逻辑可以看出,重均衡逻辑中存在两处场景需要与消费者进行业务交互,而消费者的所有请求处理都需要通过明确的poll方法调用来进行触发,因而阻塞poll方法的正常调用,很容易导致消费组重均衡任务的长期卡顿,甚至超时。
建议在消费者使用中保证两次poll方法的调用间隔不要超过10s,对于下游业务消息处理慢的场景,可以考虑有优化下游处理速度或者通过异步消息处理的方式来实现。
另外还需要注意的是,若使用自由分配(Assign)的方式来进行消费的话,消费组的状态将一直保持在Empty状态。

消费提交

客户端默认的提交方式为定期自动提交的方式,由以下配置决定:

  • enable.auto.commit 参数定义是否开启自动提交,默认为 true。
  • auto.commit.interval.ms 参数定义自动提交的周期,默认为 5000ms。

在实际业务中推荐关闭自动提交,在消息处理完成后,由业务侧调用commitAsync的方法进行消费位点的提交,以避免消息处理失败后,因自动提交可能导致无法重试的问题。同时需要注意业务主动调用提交方法的频率不宜太快,处理完一批消息后定时提交一次即可,推荐 5s 提交一次。

消费性能

消费组中可以同时运行的消费者的并发数,与所消费的 Topic 分区数相关,最多不能超过分区个数。因而当消费组产生堆积时可以参考以下方式处理:

  • 若消费者个数小于分区数,则可以通过增加消费组中消费者个数的方式,尝试增加消费性能。
  • 若消费者个数已等于分区数,则可以先进行 Topic 的分区数扩容,之后再尝试增加消费者个数。注意,Topic 分区扩容可能会打乱原来分区有序的消息。
  • 消费者流量较大的情况下,也可以修改receive.buffer.bytes调整 TCP 的接受缓存区大小,默认为 64KB。建议修改为 1MB。

多线程使用

消费者与生产者不同,不是线程安全的,不支持多个线程调用相同的消费者对象。每个消费者都需要放在一个独立的业务线程中调用。

优雅退出

消费者退出时,推荐调用close方法进行关闭,主动中断与服务端的业务。否则可能会导致消费者未正常发送退出请求,阻塞服务端消费组的业务,默认阻塞 10s。
避免频繁的创建和关闭消费者,每次创建或关闭都会引起消费组的重均衡,重均衡状态的消费组无法正常获取消息。

域名解析

火山引擎 Kafka 实例为分布式集群部署,初始接入点使用域名的方式提供。当客户端使用域名接入时,推荐设置客户端的 DNS 解析方式为全部 IP 解析,即 client.dns.lookup=use_all_dns_ips。保证分布式集群在发生后端节点变化的时候客户端仍然能够正常使用。