| 1 | PRotectedstatic Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = newConcurrentHashMap<>(); |
| 01020304050607080910111213141516171819202122 | privatestatic synchronized void startOffsetListener(ConsumerConnector consumerConnector) { Map<String, Integer> topicCountMap = newHashMap<String, Integer>(); topicCountMap.put(consumerOffsetTopic,newInteger(1)); KafkaStream<byte[],byte[]> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get(0); ConsumerIterator<byte[],byte[]> it = offsetMsgStream.iterator(); while(true) { MessageAndMetadata<byte[],byte[]> offsetMsg = it.next(); if(ByteBuffer.wrap(offsetMsg.key()).getShort() < 2) { try{ GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if(offsetMsg.message() == null) { continue; } OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); offsetMap.put(commitKey, commitValue); }catch(Exception e) { e.printStackTrace(); } } } } |
| 123456789 | namespace java org.smartloli.kafka.eagle.ipc service KafkaOffsetServer{ string query(1:string group,2:string topic,3:i32 partition), string getOffset(), string sql(1:string sql), string getConsumer(), string getActiverConsumer()} |
| 12345 | <dependency> <groupId>org.smartloli</groupId> <artifactId>jsql-client</artifactId> <version>1.0.0</version></dependency> |
| 1 | JSqlUtils.query(tabSchema, tableName, dataSets, sql); |
正在消费的关系图如下所示:
消费详细 offset 如下所示:
消费和生产的速率图,如下所示:
5.总结这里,说明一下,当 offset 存入到 Kafka 的topic中后,消费线程ID信息并没有记录,不过,我们通过阅读Kafka消费线程ID的组成规则后,可以手动生成,其消费线程ID由:Group+ConsumerLocalAddress+Timespan+UUID(8bit)+PartitionId,由于消费者在其他节点,我们暂时无法确定ConsumerLocalAddress。最后,欢迎大家使用 Kafka 集群监控 ——[Kafka Eagle ],[操作手册 ]。新闻热点
疑难解答