首页 > 编程 > Java > 正文

Kafka简单客户端编程实例

2019-11-26 10:59:33
字体:
来源:转载
供稿:网友

今天,我们给大家带来一篇如何利用Kafka的API进行客户端编程的文章,这篇文章很简单,就是利用Kafka的API创建一个生产者和消费者,生产者不断向Kafka写入消息,消费者则不断消费Kafka的消息。下面是具体的实例代码。

一、创建配置类Config

这个类很简单,只是存放了两个常量,一个是话题TOPIC,一个是线程数THREADS

package com.lya.kafka;  /**  * 配置项  * @author liuyazhuang  *  */ public class Config {    /**   * 话题   */  public static final String TOPIC = "wordcount";  /**   * 线程数   */  public static final Integer THREADS = 1; } 

二、编程生产者类ProducerDemo

这个类的主要作用就是向Kafka写入相应的消息,并且将消息写入wordcount话题。

package com.lya.kafka;  import java.util.Properties;  import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig;  /**  * 生产者实例  * @author liuyazhuang  *  */ public class ProducerDemo {  public static void main(String[] args) throws Exception {   Properties props = new Properties();   props.put("zk.connect", "192.168.209.121:2181");   props.put("metadata.broker.list","192.168.209.121:9092");   props.put("serializer.class", "kafka.serializer.StringEncoder");   props.put("zk.connectiontimeout.ms", "15000");   ProducerConfig config = new ProducerConfig(props);   Producer<String, String> producer = new Producer<String, String>(config);    // 发送业务消息   // 读取文件 读取内存数据库 读socket端口   for (int i = 1; i <= 100; i++) {    Thread.sleep(500);    producer.send(new KeyedMessage<String, String>(Config.TOPIC,      "this number ===>>> " + i));   }   } } 

三、编写消息者类ConsumerDemo

这个类的主要作用就是消费Kafka中wordcount话题的消息。

package com.lya.kafka;  import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties;  import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; import kafka.message.MessageAndMetadata;  /**  * 消费者实例  * @author liuyazhuang  *  */ public class ConsumerDemo {     public static void main(String[] args) {      Properties props = new Properties();   props.put("zookeeper.connect", "192.168.209.121:2181");   props.put("group.id", "1111");   props.put("auto.offset.reset", "smallest");   props.put("zk.connectiontimeout.ms", "15000");    ConsumerConfig config = new ConsumerConfig(props);   ConsumerConnector consumer =Consumer.createJavaConsumerConnector(config);   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();   topicCountMap.put(Config.TOPIC, Config.THREADS);   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(Config.TOPIC);      for(final KafkaStream<byte[], byte[]> kafkaStream : streams){    new Thread(new Runnable() {     @Override     public void run() {      for(MessageAndMetadata<byte[], byte[]> mm : kafkaStream){       String msg = new String(mm.message());       System.out.println(msg);      }     }        }).start();      }  } } 

四、运行实例

首先,运行消费者类ConsumerDemo
运行结果如下:

没有打印任何信息。
此时,我们运行生产者类ProducerDemo
我们再次打开消费者的控制台查看如下:

打印出了生产者生产的消息。
至此,Kafka简单客户端编程实例结束。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表