标签(空格分隔): kafka
Kafka 生产者发送消息的过程:
Kafka 会将发送消息包装为 ProducerRecord 对象, ProducerRecord 对象包含了目标主题和要发送的内容,同时还可以指定键和分区。在发送 ProducerRecord 对象前,生产者会先把键和值对象序列化成字节数组,这样它们才能够在网络上传输。
接下来,数据被传给分区器。如果之前已经在 ProducerRecord 对象里指定了分区,那么分区器就不会再做任何事情。如果没有指定分区 ,那么分区器会根据 ProducerRecord 对象的键来选择一个分区,紧接着,这条记录被添加到一个记录批次里,这个批次里的所有消息会被发送到相同的主题和分区上。有一个独立的线程负责把这些记录批次发送到相应的 broker 上。
服务器在收到这些消息时会返回一个响应。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。生产者在收到错误之后会尝试重新发送消息,如果达到指定的重试次数后还没有成功,则直接抛出异常,不再重试。
思考:怎么使用生产者
场景:是否每个消息都很重要?是否允许丢失一小部分消息?偶尔出现重复消息是否可以接受?是否有严格的延迟和吞吐量要求?
生产数据入库场景:每个消息都比较重要。应该可以运行丢失一小部分消息?可以运行出现重复消息,有严格的延迟和吞吐量要求。
保存点击场景:允许丢失或者重复,可以允许几秒后的延迟,吞吐量要求则比较高。
我们把消息发送给服务器,但井不关心它是否正常到达。大多数情况下,消息会正常到达,因为 kafka 是高可用的,而且生产者会自动尝试重发。不过,使用这种方式有时候也会丢失一些消息。
send () 方法会返一个包含 RecordMetadata Future 对象,不过因为会忽略返回值,所以无法知道消息是否发送成功。
问题:需要等服务器响应吗?TCP长链接,会有一个请求标识,等响应返回后可以通过Future再次拿出来?
使用 send () 方法发送消息 它会返回 Future 对象,调用 get () 方法进行等待就可以知道消息是否发送成功。
我们调用 send () 方法,并指定一个回调函数,服务器在返回响应时调用该函数。
假设消息在应用程序和 Kafka集群之间一个来回需要lOms。如果在发送完每个消息后都等待回应,那么发送 100 个消息需要1s 。但如果只发送消息而不等待响应,那么发送100 个消息所需要的时间会少很多。
目的:在遇到消息发送失败时,我们需要抛出异常、记录错误日志,或者把消息写入“错误消息”文件以便日后分析。
- acks=0 生产者在成功写入消息之前不会等待任何来自服务器的响应。
- acks=1 ,只要集群的首领节点收到消息,生产者就会收到一个来自服务器的成功响应。不过,如果一个没有收到消息的节点成为新首领,消息还是会丢失。这个时候的吞吐量取决于使用的是同步发送还是异步发送。如果让发送客户端等待服务器的响应(通过调用Future对象的 get ()方法),显然会增加延迟(在网络上传输一个来回的延迟)。如果客户端使用回调,延迟问题就可以得到缓解,不过吞吐量还是会受发送中消息数量的限制(比如,生产者在收到服务器响应之前可以发送多少个消息)。
- acks=all ,只有当所有参与复制的节点全部收到消息时,生产者才会一个来自服务器的成功响应。这种模式是最安全的,它可以保证不止一个服务器收到消息,就算有服务器发生崩溃,整个集群仍然可以运行。
buffer.memory:如果应用程序发送消息的速度超过发送到服务器的速度,会导致生产者空间不足。 send ()方法调用要么被阻塞,要么抛出异常。
取决于 max.block.ms ,表示在抛出异常之前可以阻塞一段时间)。
默认情况下,消息发送时不会被压缩。该参数可以设置为snappy 、 gzip 或 lz4 消息被发送给 broker 之前使用哪一种压缩算也进行压缩。snappy 压缩算怯由 Google巳发明,它占用较少的 CPU ,却能提供较好的性能和相当可观的压缩比,如果比较关注性能和网络带宽,可以使用这种算法。 gzip 压缩算法一般会占用较多的 CPU ,但会提供更高的压缩比,所以如果网络带宽比较有限,可以使用这种算法。使用压缩可以降低网络传输开销和存储开销,而这往往是向 Kafka 发送消息的瓶颈所在。
生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries 参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错民。
重试可以通过retry.backoff.ms参数来改变这个时间间隔。建议在设置重试次数和重试时间间隔之前,先测试一下恢复一个崩溃节点需要多少时间(比如所有分区选举出首领需要多长时间)。
当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数)。得注意max_request_size一个发送请求最大的值。
KafkaProduce会在一次填满或linger.ms达到上限时把批次发送出去。虽然这样会增加延迟,但 会提升吞吐量(因为一次性发送更多的消息,每个消息的开销就变小了).类似TCP的Nagle算法
该参数指定了生产者在到服务器晌应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过会提升吞吐量。 它设为1可以保证消息是按照发送的顺序入服器的,即使发生了重试。
需要注意的是:如果设定重试次数,并且上面的次数大于1,那么,如果第一个批次消息写入失败,而第二个批次写入成功, broker会重试写入第一个批次。如果此时第 一个批次也写入成功,那两个批次的顺序就反过来了!!
不建议把 retry 设为1 。可以max.in.flight.requests.per.connection设为1,这样在生产者尝试发送第一批消息时,就不会有其他的消息发送给broker。不过这样会严重影响生产者的吞吐量,所以只有在对消息的顺序有严格要求的情况下才能这么做。
问题: max.in.flight.requests.per.connection默认值是多少?吞吐量影响会有多大?默认是: 5
思考:那生产环境应该要怎么设?binlog为例子:顺序是很重要的,所以得设成顺序发送吗?吞吐量降低很多。那如果允许批次顺序相反吗?肯定不行。那允许不重试,但是不等发送设为大于一,可以吗?那么就会丢了第一批数据,会造成什么影响?通过记录错误,并进行自行恢复进行最大化使用吞吐量?
结论:最好还是设允许重试,max.in.flight.requests.per.connection看情况。如果有顺序性要求,那么只能顺序发送,设为1,虽然影响吞吐量,不过应该可以通过增加生产者的数量来提供更大的吞吐量。
思考:所以dj要怎么配置与写生产者?不用自己写buffer类了,直接设batch.size与linger.ms就可以了。
那可靠行呢?上面提到生产数据入库场景:每个消息都较重要。不过应该可以运行丢失一小部分消息?可以运行出现重复消息,有严格的延迟和吞吐量要求。所以重试还是要设的,max.in.flight.requests.per.connection也可以设为大于1,并没有顺序性要求。
需要同步吗?直接发送后忽略会怎么样?会丢数据。那异步回调咯,如果发送错误,可以记录错误日志。
事务性要求?如果中间死机了,不管同步还是异步,还是会丢失数据的,假设是没有什么事务保证,除非要做到向mysql一样,有redo log、分布式事务之类的保证。所以假设需要事务保证,你可以在同步的返回的时候,提交。或者在异步回调的时候提交。所以死机这种情况,跟同步还是异步没有太大关系。主要是你的事务没有提交,那么就会重复生产。保证原子性很难,故保证支持重复生产或者消费更靠谱,幂等性。
注意,即使在有延迟ms=0的情况下,那些在时间上很接近的记录通常也会被批处理在一起,所以在高负载下,无论延迟配置如何,都会发生批处理