首页 > 学院 > 开发设计 > 正文

Kafka 0.9.0.x 控制消费数据数量

2019-11-06 08:12:05
字体:
来源:转载
供稿:网友

1、一个consumer多个partition的情况

因kafka0.9.0.x 只能控制poll数据的时间,如果每次fetch的数据过多而consumer在session timeout的时间内没处理过来的话,Coordinator会认为该consumer已经挂掉了,然后进行rebalance,重新分配partition

这里只贴一段代码:

public void init() { try { PRoperties = new Properties(); InputStream in = this.getClass().getResourceAsStream("/kafka/grouponKafkaConsumer.properties"); properties.load(in); consumer = new KafkaConsumer<>(properties); consumer.subscribe(Arrays.asList(properties.getProperty("topic"))); new Thread(new Runnable() { @Override public void run() { while(true) { //每5s取一次数据 ConsumerRecords<String, String> records = consumer.poll(5000); //遍历所有的partition for(TopicPartition tp : records.partitions()) { //每次处理多少条数据 int maxpoll = 0; //当前消息的offset long currentOffset = 0; //获取partition中的消息 List<ConsumerRecord<String, String>> partitionRecords = records.records(tp); for (ConsumerRecord<String, String> record : partitionRecords) { try { JSONObject json = JSONObject.parSEObject(record.value()); Integer Operatetype = json.getInteger("operatetype"); if(operatetype != null) { ecarGrouponService.createGrouponDoc(json.toString()); } else { ecarGrouponService.updateGrouponDoc(json.toString()); } } catch (Exception e) { logger.error(e); e.printStackTrace(); } currentOffset = record.offset(); maxpoll++; if(maxpoll > 20) { break; } } //指定消费到的位置 long lastoffset = currentOffset + 1; //指定下次poll的位置 consumer.seek(tp, lastoffset); consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastoffset))); } } } }).start(); } catch (Exception e) { logger.error(e); e.printStackTrace(); } }
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表