You need to enable JavaScript to run this app.
导航

Kafka 生产者最佳实践

最近更新时间2024.01.15 11:10:55

首次发布时间2023.11.03 16:05:48

本文档以 Confluent 官方的 Java 版本 SDK 为例介绍 Kafka 生产者和消费者的使用建议。推荐在使用消息队列 Kafka版进行消息生产与消费之前,阅读以下使用建议,提高接入效率和业务稳定性。

消息顺序性

火山引擎 Kafka 实例的消息在同一分区中可以保证数据的先入先出。即写入同一分区的消息,若消息 A 先于消息 B 写入,那么在进行消息读取时,消息A也一定可以先于消息 B 被客户端读到。需要注意的是此处仅保证通过同一生产者先后发送的消息可以保证有序,不同生产者之间的消息因为无法确认到达服务端的先后顺序,所以无法保证有序。
基于以上特性,若要实现消息顺序性的能力,可以考虑以下方式:

  • **全局有序:**创建仅 1 分区的 Topic。因为 Topic 仅有一个分区,因而发送过来的消息与生产者客户端发送的消息顺序严格一致。但是 1 分区的 Topic 单从单个 Topic 的角度来看,在消息的写入和读取中都无法发挥集群完整集群性能,只有多个 1 分区的 Topic 同时使用时,才有可能最大限度的发挥集群的性能。
  • **分区有序:**Kafka 分区中消息天然有序,因而也可以通过将需要保证顺序的消息写入到同一分区的方式来实现消息的有序。适用于不需要所有消息都保证顺序或者特定类别的消息保证顺序的场景。

单分区的 Topic 在生产消费性能上会有较大的限制。在实际使用中推荐选择分区有序的方式实现业务逻辑,将需要保序的消息写入相同的分区中实现同类消息的有序。

消息可靠性

acks 配置定义了写入消息确认的方式,并支持以下三种配置:

  • acks=0:不关心消息的写入结果,服务端对于该消息的写入,无论成功失败都不会有任何结果返回。
  • acks=1:服务端在写入主副本之后即可返回写入结果到生产者客户端。
  • acks=-1acks=all:消息需要在主备副本都写入后才可返回写入结果到生产客户端。

acks 的三种配置,从上到下,性能依次下降,数据可靠性依次上升。推荐您直接使用可靠性最高的配置方式。
对于分布式系统,因网络或者主节点切换等问题,可能存在偶现的发送失败问题。您可以通过 retries 参数配置写入失败的重试次数,重试次数默认为长整型的最大值;通过 retry.backoff.ms 配置重试的间隔,间隔默认为 100ms。推荐配置重试次数为 3 次、重试间隔为 1000ms。

分区选择

消息实际在写入时会选择 Topic 中的某一分区进行写入。分区选择逻辑如下:

  • 当消息指定消息 key 时,会根据消息 key 的 hash 结果进行分区选择,相同 key 的消息将会分配到相同的分区编号。因而当为一批消息指定相同的消息 key 时,即可实现这些消息的分区有序。但是需要注意,若执行 Topic 增加分区后,消息 key 选择到的分区编号可能会发生变化。
  • 对于 2.4 以下的客户端版本,若不指定消息 key,则消息会以轮询的方式依次写入到每个分区中。但是此种方式可能导致客户端消息聚合效果不理想,影响发送性能。
  • 对于 2.4 及以上的客户端版本,若不指定消息 key 时,则消息会以粘性分区选择的方式写入分区中,主要是为解决聚合效果不理想的问题。在分区选择时优先写入上一次消息写入的分区,直到客户端对于该分区的消息聚合达到配置的聚合上限(batch.size)。在保证了消息聚合效果的同时,在长时间来看,也达到了分区的写入均衡。
  • 除了以上默认的实现之外,用户也可以自定义实现分区的选择逻辑。

推荐使用默认的分区选择逻辑即可。无消息 key 时默认逻辑本身已经实现了消息在分区中的均衡。对于使用消息 key 的场景,减少消息 key 的 hash 冲突可以有效打散消息,避免部分分区中承载的业务和消息过多。对于自定义实现的分区选择同样也需要注意尽可能的保证分区选择的均衡,避免业务和消息过度集中在部分分区中。

发送结果

生产者消息的发送实现当前为异步逻辑。即调用 send 方法写入消息时,实际消息仅仅是写入本地缓存中,实际并未发送到服务端。消息会在本地缓存中根据分区做一次消息聚合,之后由异步发送线程扫描将聚合后的消息发送到服务端中。
send 方法仅为写入缓存,不代表消息实际写入成功。要获取消息的实际写入结果,当前有以下方式可以选择:

  • 调用 send 方法时为每条消息绑定一个回调函数 Callback。
  • 在生产者中通过配置interceptor.classes注入一个自定义的实现ProducerInterceptor接口的拦截器,该拦截器会将消息写入的结果或异常通过onAcknowledgement方法进行传递。
  • send 方法的返回结果为一个Future对象,可直接调用该对象的get方法,阻塞等待消息的写入结果。

以上三种方式中,推荐使用前两者中的任意一种,第三种其实是一种伪同步的实现方式,会严重影响客户端的生产性能,不推荐使用。

生产性能

生产者通过内存缓存,消息聚合的方式,减少和服务端之间的网络请求,从而达到吞吐性能的大幅度提升。对于生产端的聚合能力,当前支持以下配置的自定义:

  • batch.size 配置定义了单次聚合的最大消息大小。默认大小为 16KB,不建议设置为较小的值。在发现服务端生产请求速率过快导致服务端 CPU 较高的时候,可以适当调大此值,增加聚合能力,减少 CPU 的消耗。
  • linger.ms 配置定义了单次聚合的最小聚合时间。默认为 0ms,即消息尽快发送的方式。适当调整此参数也可以增加消息的聚合力度。但是相对的会增加消息发送的延迟。在消息聚合不佳,生产请求大幅增加的场景下,也可以适当增大此参数,在消息聚合力度和可接受的发送延迟之间选择一个合适的平衡。
  • send.buffer.bytes 配置定义了发送端 TCP 缓冲区的大小,默认为 128KB 大小,若客户端发送的单条消息较大,或者 batch.size 调大之后,需要调整 TCP 缓冲区大小,推荐 128KB-1024KB 之间。

消息时间戳

消息的时间戳支持两种不同的填充方式,为服务端集群的配置,详细说明请参考修改参数配置

  • LogAppendTime:使用服务端写入消息的时间作为消息时间戳。
  • CreateTime:(默认)使用生产者创建消息的时间,也就是消息写入时自带的时间戳。

消息的时间戳会被用于计算消息的过期老化等场景。客户端发送的消息需要保证具备合理的时间戳,一旦消息时间戳填写错误,可能会导致数据不会按照预期的时间进行老化删除。在写入消息后,可通过消息偏移量查询进行排查。通常较老版本的 API 会存在无消息时间戳的问题,建议使用推荐的客户端版本。
Confluent 默认的 SDK 在不指定消息时间戳的情况下,会填入生产者本地的当前时间。若您需要自行指定时间时,应注意填入正确的时间戳,以免影响服务端的消息老化等业务处理。

多线程使用

生产者为线程安全的实现方式,因而在客户端业务实现中,推荐使用生产者池的方式,将生产者提供给不同的多线程业务使用,避免每个生产业务创建独立生产者。

优雅退出

生产者为异步发送的方式,在预期关闭生产之前,推荐调用生产者的flush方法,主动将缓存中的消息推送到服务端,再调用close方法关闭生产者客户端。否则可能会导致部分缓存中的消息在关闭之前未能及时放松,导致消息丢失。

域名解析

火山引擎 Kafka 实例为分布式集群部署,初始接入点使用域名的方式提供。当客户端使用域名接入时,推荐设置客户端的 DNS 解析方式为全部 IP 解析,即 client.dns.lookup=use_all_dns_ips。保证分布式集群在发生后端节点变化的时候客户端仍然能够正常使用。