1、一个consumer多个partition的情况
因kafka0.9.0.x 只能控制poll数据的时间,如果每次fetch的数据过多而consumer在session timeout的时间内没处理过来的话,Coordinator会认为该consumer已经挂掉了,然后进行rebalance,重新分配partition
这里只贴一段代码:
public void init() { try { PRoperties = new Properties(); InputStream in = this.getClass().getResourceAsStream("/kafka/grouponKafkaConsumer.properties"); properties.load(in); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); new Thread(new Runnable() { @Override public void run() { while(true) { //每5s取一次数据 ConsumerRecords<String, String> records = consumer.poll(5000); //遍历所有的partition for(TopicPartition tp : records.partitions()) { //每次处理多少条数据 int maxpoll = 0; //当前消息的offset long currentOffset = 0; //获取partition中的消息 List<ConsumerRecord<String, String>> partitionRecords = records.records(tp); for (ConsumerRecord<String, String> record : partitionRecords) { try { JSONObject json = JSONObject.parSEObject(record.value()); Integer Operatetype = json.getInteger("operatetype"); if(operatetype != null) { ecarGrouponService.createGrouponDoc(json.toString()); } else { ecarGrouponService.updateGrouponDoc(json.toString()); } } catch (Exception e) { logger.error(e); e.printStackTrace(); } currentOffset = record.offset(); maxpoll++; if(maxpoll > 20) { break; } } //指定消费到的位置 long lastoffset = currentOffset + 1; //指定下次poll的位置 consumer.seek(tp, lastoffset); consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastoffset))); } } } }).start(); } catch (Exception e) { logger.error(e); e.printStackTrace(); } }新闻热点
疑难解答