Kafka消费异常处理
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
kafka重复消费解决方案 kafka重复消费和消息丢失
kafka重复消费解决方案 kafka重复消费和消息丢失
两次poll()的时间间隔大于配置的session.timeout.ms,根本原因是处理时间太长,大于设定的session.timeout.ms。如果长时间不调用poll()方法,集群会认为该消费者已经挂掉了,就不会让它提交偏移量了,这样就会造成重复消费数据。
Assuming we are talking about Kafka 0.10.1.0 or upwards where each consumer instance employs two threads to function. One is user thread from which poll is called; the other is heartbeat thread that specially takes care of heartbeat things.
session.timeout.ms is for heartbeat thread. If coordinator fails to get any heartbeat from a consumer before this time interval elapsed, it marks consumer as failed and triggers a new round of rebalance.
max.poll.interval.ms is for user thread. If message processing logic is too hey to cost larger than this time interval, coordinator explicitly he the consumer lee the group and also triggers a new round of rebalance.
heartbeat.interval.ms is used to he other healthy consumers aware of the rebalance much faster. If coordinator triggers a rebalance, other consumers will only know of this by receiving the heartbeat response with REBALANCE_IN_PROGRESS exception encapsulated. Quicker the heartbeat request is sent, faster the consumer knows it needs to rejoin the group.
Suggested values:
session.timeout.ms : a relatively low value, 10 seconds for instance.
max.poll.interval.ms : based on your processing requirements
heartbeat.interval.ms : a relatively low value, better 1/3 of the session.timeout.ms
修改配置参数,调大间隔,调小一次处理的任务数量
使用多线程并行处理
kafka重复消费的原因
如果自动提交的偏移量小于客户端处理的后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理,
设我们采用了自动提交,且提交时间间隔为5s,在近一次提交之后的3s发生了再均衡,再均衡之后,消费者从后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可能出现重复悄息的时间窗,不过这种情况是无也完全避免的。
kafka消费相同消费组问题
failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records
1、有消费者宕机下线。消费者并不一定需要真正下线,例如遇到长时间的GC、网络延迟导致消费者长时间未向 GroupCoordinator 发送心跳等情况时,GroupCoordinator 会认为消费者已经下线。
修改参数
2、 kafkaConsumer.assign() 点对点消费方式 和 subscribe()消费方式 ,使用了相同的消费组,也就是他们group id 相同时,group coordinator无法识别具有相同消费组group id的consumer,直接抛异常 CommitFailedException
如果是这种情况,提示的解决方法 max.poll.interval.ms,max.poll.record 都没用,无法解决,只能修改消费组id
kafkaspout消费过的数据怎么还消费
public class j { static int add(int a,int b){ int s; s=a+b; return s; } public static void main(String args[]){ int i=1,j=2; int t; t=add(i,j); System.out.println("1+2=几?"); System.out.println("当然是:"+t); } }
kafka重复消费的问题
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
mq如何保证高可用,解决重复消费、数据丢失问题和顺序性问题
rabbitmq有三种模式: 单机模式,普通集群模式,镜像集群模式
kafka架构:多个broker组成,每个broker是一个;创建一个topic,这个topic可以划分为多个partition,每个partition可以存在于不同的broker上,每个partition就放一部分数据。
它是一个分布式消息队列,就是说一个topic的数据,是分散放在多个机器上的,每个机器就放一部分数据。
kafka 0.8以前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,没法写也没法读,没有什么高可用性可言。
kafka 0.8以后,提供了HA机制,就是replica副本机制。每个partition的数据都会同步到吉他机器上,形成自己的多个replica副本。然后所有replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,然后其他replica就是follower。写的时候,leader会负责把数据同步到所有follower上去,读的时候就直接读leader上数据即可。kafka会均匀的将一个partition的所有replica分布在不同的机器上,从而提高容错性。
如果某个broker宕机了也没事,它上面的partition在其他机器上都有副本的,如果这上面有某个partition的leader,那么此时会重新选举一个新的leader出来,大家继续读写那个新的leader即可。这就有所谓的高可用性了。
写数据的时候,生产者就写leader,然后leader将数据落地写本地磁盘,接着其他follower自己主动从leader来pull数据。一旦所有follower同步好数据了,就会发送ack给leader,leader收到所有follower的ack之后,就会返回写成功的消息给生产者。
消费的时候,只会从leader去读,但是只有当消息已经被所有follower都同步成功返回ack的时候,这个消息才会被消费者读到。
kafka重复消费的情况:
kafka有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,下次重启时,从上次消费到的offset来继续消费。但是offset没来得及提交就重启,这部分会再次消费一次。
怎么保证消息队列消费的幂等性:
丢数据,mq一般分为两种,要么是mq自己弄丢了,要么是我们消费的时候弄丢了
拆分多个queue,每个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue但是对应一个consumer,然后这个consumer内部用内存队列做排队,然后分发给底层不同的worker来处理
写入一个partition中的数据一定是有序的,生产者在写的时候 ,可以指定一个key,比如指定订单id作为key,这个订单相关数据一定会被分发到一个partition中去。消费者从partition中取出数据的时候也一定是有序的,把每个数据放入对应的一个内存队列,一个partition中有几条相关数据就用几个内存队列,消费者开启多个线程,每个线程处理一个内存队列。