1 | PRotected static Map<GroupTopicPartition, OffsetAndMetadata> offsetMap = new ConcurrentHashMap<>(); |
01020304050607080910111213141516171819202122 | private static synchronized void startOffsetListener(ConsumerConnector consumerConnector) { Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); topicCountMap.put(consumerOffsetTopic, new Integer( 1 )); KafkaStream< byte [], byte []> offsetMsgStream = consumerConnector.createMessageStreams(topicCountMap).get(consumerOffsetTopic).get( 0 ); ConsumerIterator< byte [], byte []> it = offsetMsgStream.iterator(); while ( true ) { MessageAndMetadata< byte [], byte []> offsetMsg = it.next(); if (ByteBuffer.wrap(offsetMsg.key()).getShort() < 2 ) { try { GroupTopicPartition commitKey = readMessageKey(ByteBuffer.wrap(offsetMsg.key())); if (offsetMsg.message() == null ) { continue ; } OffsetAndMetadata commitValue = readMessageValue(ByteBuffer.wrap(offsetMsg.message())); offsetMap.put(commitKey, commitValue); } catch (Exception e) { e.printStackTrace(); } } } } |
123456789 | namespace java org.smartloli.kafka.eagle.ipc service KafkaOffsetServer{ string query( 1 :string group, 2 :string topic, 3 :i32 partition), string getOffset(), string sql( 1 :string sql), string getConsumer(), string getActiverConsumer() } |
12345 | <dependency> <groupId>org.smartloli</groupId> <artifactId>jsql-client</artifactId> <version> 1.0 . 0 </version> </dependency> |
1 | JSqlUtils.query(tabSchema, tableName, dataSets, sql); |
新闻热点
疑难解答