kafka基本用法,源码中解析如何优雅使用Kafka生产者
今天主要给大家分享源码中解析如何优雅使用Kafka生产者的知识,也会对于kafka基本用法的题进行解,现在开始给各位讲解吧!
专注Java领域优质技术,
前言
有朋友,当消息较多时,Kakfa如何保证消息的效率和一致性。
现在是讨论如何正确有效地发送消息以及Kakfa源代码的好时机。
内容较多,对源码感兴趣的朋友请系好安全带,源码是基于v01000版本进行分析的。同时最好有一定的使用Kafka的经验,了解其基本用法。发送一条简单的消息
在分析之前,我们先来看看简单的消息发送是什么样子的。
以下代码是基于SpringBoot构建的。首先,为orgapachekafkaclientsProducerProducer创建一个bean。
我们主要关注引导服务器,这是一个必需的参数。Kafka集群的broker地址,如127001:9094。
剩下的参数我们暂时不讨论,稍后会详细介绍。然后我们插入这个bean来调用send函数来发送消息。
本例中,向特定Topic发送了10万条数据,运行程序消息正常发送。
然而,这只是发送消息,并不能控制消息是否成功传递——它是一个纯粹的异步方法。
同步
那么我想知道我的消息是否发送成功,我该怎么办?
其实Producer的API已经考虑到了这一点,所以发送后只需调用get方法就可以同步获取传输结果。
将结果发送至
这种传输的效率其实比较低,因为每次都要同步等待消息传输结果。
异步
为此你需要异步发送,但实际上send方法默认是异步的,除非你手动调用get方法。
但是,使用此方法您无法知道传输结果。
因此,如果您查看发送API,您会发现还有另一个参数。
未来lt;记录元数据gt;发送生产者记录lt;K,Vgt;Producer,Callback回调;Callback是一个回调接口,允许您在发送消息后回调自定义实现。
运行后结果
同样的方法也可以得到结果,在上面的同步过程中,发现回调线程不是主线程,这也证明了是异步回调。
回调时同时传递两个参数。
RecordMetadata是成功发送符合上述条件的消息后的元数据。
Exception消息传输过程中的异常信息。
不过,这两个参数永远不会同时有数据,只有在传输失败时才会显示异常消息,同时元数据为空。
所以正确的写法是
为什么只有一个参数有值,下面源码分析中会一一解释。源码分析
您现在已经掌握了基本的消息传递。如果想深入了解传输过程中一些参数的配置,源码说了算。
首先,如果我们看一下发送消息的整个流程,Kafka不仅仅是通过网络向broker发送消息,它在Java内部经过了大量的优化和设计。
转移过程
为了帮助您直观地了解发送过程,我简要概述了发送过程中的一些关键步骤。
从上到下
kafka-Producer-network-thread初始化并实际发送IO线程消息。
序列化消息。
获取需要转移的分区。
写入内部缓冲区。
初始化的IO线程不断使用这个缓存来发送消息。
步骤分析
下面详细解释每个步骤。
重置
调用此构造函数进行初始化不仅仅只是将默认参数写入KafkaProducer。更大的题是初始化发送者线程以消耗缓冲区。
初始化IO线程。
可以看到Sender线程需要以下成员变量
确认、重试、requestTimeout等这些参数稍后分析。
序列化消息
调用发送函数后的第一步是序列化。最终,我们的消息必须穿过网络才能发送到Kafka。
其中valueSerializerserializerecordtopic,recordvalue;是一个接口,初始化时必须指定序列化实现类。
我们也可以自己实现序列化。您只需要实现orgapachekafkacommonserializationSerializer接口。
路由分区
接下来是路由分区。通常,我们使用的主题会创建多个分区以实现可扩展性和高性能。
如果是分区的话,所有的消息都可以写在这里。
但是当您有多个分区时,不可避免地要知道要写入哪个分区。
一般有以下三种方式
分区
构建ProducerRecord时,可以为每条消息指定一个分区。
这样判断路由时是否指定,如果是则直接使用对应的分区。
这通常用于特殊场景。
自定义路由策略
如果不指定分区,则调用partitionerpartition接口执行用户自定义的分区策略。
而只需要自定义一个类来实现orgapachekafkaclientsProducerPartitioner接口,并在创建KafkaProducer实例时配置partitionerclass参数即可。
一般来说,您应该自定义分区以最好地确保消息排序。
或者将其写入某个唯一的分区并让特殊的使用者处理它。
默认策略
最后一项是默认路由策略,如果您不执行任何操作,它将运行。
这种策略也使得消息分发更加均匀。
我们来看看实现。
简单来说,分为以下几个步骤
获取主题分区的数量。
+1表示内部维护的线程安全计数器。
为了获得分区数,我们计算分区数的模数。
在实际中,这是一种很常见的轮询算法,因此只要分区数量不频繁变化,这种方法是比较统一的。
写入内部缓存
send方法获得分区后,会调用附加函数。
该函数调用getOrCreateDeque将批处理写入内部缓存。
消费缓存
最初初始化的IO线程实际上是一个守护线程,它不断地使用这个数据。
前面写的数据是通过图中的几个特征得到的。这个不需要深入研究,但是有一个CompleteBatch方法非常重要。
调用该方法时,必须发送一条消息,因此调用batch来完成send方法定义的回调接口。
这里你也可以明白为什么我前面说传输完成后只会显示一项元数据和异常信息了。生产者参数分析
完成传输过程后,我们来看看Producer中一些比较重要的参数。
赞同
acks是影响消息吞吐量的主要参数。
主要选项为[all,-1,0,1],默认为1。
这是因为Kafka并不采用主备模式,而是采用类似于Zookeeper的主备模式。
前提是该主题由多个副本组成Replicas>1。如果acks=全部/-1
这意味着确保所有跟随者副本在返回之前都已完成数据写入。
这样您就不会丢失任何消息!
但同时,它的性能和吞吐量也是最低的。如果ack=0
生产者不会等待副本的响应。这是最容易丢失消息的方法,但同时它的性能也是最好的!
如果ack=1
这是一个折衷的解决方案它等待副本领导者的响应,但不等待跟随者的响应。
如果领导挂断,消息就会丢失。但性能和消息安全都得到了一定程度的保证。
批量大小
顾名思义,该参数了内部缓冲区的大小,因此适当增大它可以提高吞吐量。
但不要太极端,设置太高会浪费内存。如果太小,就不会有效,这是典型的时空权衡。
上面的照片反映了它的多种用途。
重试
retries该参数主要用于重试,如果出现一些网络抖动就会发生重试。
该参数还重试次数。
但还存在其他题。
因为是重传,所以消息顺序可能不一致,甚至存在上面提到的分段消息也可能不完全顺序的情况。
或者,如果网络题导致消息成功创建但未成功响应生产者,重试可能会导致重复消息。这只能由消费者幂等地完成。
高效的交付方式
如果您的消息量非常大,您需要尽快将消息传输到Kafka。生产者总是受到缓存大小等的影响。
我可以创建多个生产者进行传输吗?
配置最大生产者数量。
发送消息时,首先获取Producer,获取时检查是否达到最大,如果没有,则创建一个新的并存储到内部List中,同时同步save过程,防止并发题。
检索发件人时,您可以根据默认拆分策略使用轮询。
这样在频繁发送大量消息的场景下,可以提高传输效率,减轻单个生产者的负担。
制片人关闭
最后一步是关闭Producer。Producer在使用过程中会消耗大量资源,必须显式关闭以回收这些资源。
默认关闭方法和限时方法会在一段时间后强制关闭。
但是,剩余的任务将在到期前处理。
所以使用哪一种要根据情况而定。
总结
本文内容较多,从示例和源码方面对Kafka生产者进行了分析。
希望读过本文的朋友有所收获,也欢迎留言讨论。
发表评论