首页 > 学院 > 开发设计 > 正文

Flume+Kafka+Strom基于伪分布式环境的结合使用

2019-11-14 20:56:42
字体:
来源:转载
供稿:网友
Flume+Kafka+Strom基于伪分布式环境的结合使用目录:  一、Flume、Kafka、Storm是什么,如何安装?  二、Flume、Kafka、Storm如何结合使用?    1) 原理是什么?    2) Flume和Kafka的整合    3) Kafka和Storm的整合    4) Flume、Kafka、Storm的整合  一、Flume、Kafka、Storm是什么,如何安装?  Flume的介绍,请参考这篇文章《Flume1.5.0的安装、部署、简单应用》  Kafka的介绍,请参考这篇文章《kafka2.9.2的分布式集群安装和demo(java api)测试》  Storm的介绍,请参考这篇文章《Ubuntu12.04+storm0.9.2分布式集群的搭建》  在后面的例子中,我们也是使用以上三篇文章中的配置进行测试。  二、Flume、Kafka、Storm如何结合使用?    1) 原理是什么?  如何你仔细阅读过关于Flume、Kafka、Storm的介绍,就会知道,在他们各自之间对外交互发送消息的原理。  在后面的例子中,我们主要对Flume的sink进行重构,调用kafka的消费生产者(PRoducer)发送消息;在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出。    2) flume和kafka的整合     #复制flume要用到的kafka相关jar到flume目录下的lib里面。?
123root@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/kafka_2.9.2-0.8.1.1.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/scala-library-2.9.2.jar /home/hadoop/flume-1.5.0-bin/libroot@m1:/home/hadoop# cp /home/hadoop/kafka_2.9.2-0.8.1.1/libs/metrics-core-2.2.0.jar /home/hadoop/flume-1.5.0-bin/lib
     #编写sink.java文件,然后在eclipse导出jar包,放到flume-1.5.1-bin/lib目录中,项目中要引用flume-ng-configuration-1.5.0.jar,flume-ng-sdk-1.5.0.jar,flume-ng-core-1.5.0.jar,zkclient-0.3.jar,commons-logging-1.1.1.jar,在flume目录中,可以找到这几个jar文件,如果找不到就用find命令搜一下。?
12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364package idoall.cloud.flume.sink;import java.util.Properties;import kafka.javaapi.producer.Producer;import kafka.producer.KeyedMessage;import kafka.producer.ProducerConfig;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.flume.Channel;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.EventDeliveryException;import org.apache.flume.Transaction;import org.apache.flume.conf.Configurable;import org.apache.flume.sink.AbstractSink;public class KafkaSink extends AbstractSink implements Configurable {private static final Log logger = LogFactory.getLog(KafkaSink.class);private String topic;private Producer<String, String> producer;public void configure(Context context) {topic = "idoall_testTopic";Properties props = new Properties();props.setProperty("metadata.broker.list", "m1:9092,m2:9092,s1:9092,s2:9092");props.setProperty("serializer.class", "kafka.serializer.StringEncoder");props.put("partitioner.class", "idoall.cloud.kafka.Partitionertest");props.put("zookeeper.connect", "m1:2181,m2:2181,s1:2181,s2:2181/kafka");props.setProperty("num.partitions", "4"); // props.put("request.required.acks", "1");ProducerConfig config = new ProducerConfig(props);producer = new Producer<String, String>(config);logger.info("KafkaSink初始化完成.");}public Status process() throws EventDeliveryException {Channel channel = getChannel();Transaction tx = channel.getTransaction();try {tx.begin();Event e = channel.take();if (e == null) {tx.rollback();return Status.BACKOFF;}KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));producer.send(data);logger.info("flume向kafka发送消息:" + new String(e.getBody()));tx.commit();return Status.READY;} catch (Exception e) {logger.error("Flume KafkaSinkException:", e);tx.rollback();return Status.BACKOFF;} finally {tx.close();}}}
     #在m1上配置flume和kafka交互的agent?
12345678910111213141516171819202122root@m1:/home/hadoop/flume-1.5.0-bin# vi /home/hadoop/flume-1.5.0-bin/conf/kafka.confa1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = syslogtcpa1.sources.r1.port = 5140a1.sources.r1.host = localhosta1.sources.r1.channels = c1# Describe the sinka1.sinks.k1.type = idoall.cloud.flume.sink.KafkaSink# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1
     #在m1,m2,s1,s2的机器上,分别启动kafka(如果不会请参考这篇文章介绍了kafka的安装、配置和启动《kafka2.9.2的分布式集群安装和demo(java api)测试》),然后在s1机器上再启动一个消息消费者consumer?
1root@m1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-server-start.sh /home/hadoop/kafka_2.9.2-0.8.1.1/config/server.properties &
     #在m1启动flume?
1234567891011root@m1:/home/hadoop# /home/hadoop/flume-1.5.0-bin/bin/flume-ng agent -c . -f /home/hadoop/flume-1.5.0-bin/conf/kafka.conf -n a1 -Dflume.root.logger=INFO,console#下面只截取部分日志信息14/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...
     #在m1上再打开一个窗口,测试向flume中发送syslog?
1root@m1:/home/hadoop# echo "hello idoall.org syslog" | nc localhost 5140
     #m1打开的flume窗口中看最后一行的信息,Flume已经向kafka发送了消息?
12345678910111213141514/08/19 11:36:34 INFO sink.KafkaSink: KafkaSink初始化完成.14/08/19 11:36:34 INFO node.AbstractConfigurationProvider: Channel c1 connected to [r1, k1]14/08/19 11:36:34 INFO node.Application: Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.SyslogTcpSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@2a9e3ba7 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }14/08/19 11:36:34 INFO node.Application: Starting Channel c114/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.14/08/19 11:36:34 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started14/08/19 11:36:34 INFO node.Application: Starting Sink k114/08/19 11:36:34 INFO node.Application: Starting Source r114/08/19 11:36:34 INFO source.SyslogTcpSource: Syslog TCP Source starting...14/08/19 11:38:05 WARN source.SyslogUtils: Event created from Invalid Syslog data.14/08/19 11:38:05 INFO client.ClientUtils$: Fetching metadata from broker id:3,host:s2,port:9092 with correlation id 0 for 1 topic(s) Set(idoall_testTopic)14/08/19 11:38:05 INFO producer.SyncProducer: Connected to s2:9092 for producing14/08/19 11:38:05 INFO producer.SyncProducer: Disconnecting from s2:909214/08/19 11:38:05 INFO producer.SyncProducer: Connected to m1:9092 for producing14/08/19 11:38:05 INFO sink.KafkaSink: flume向kafka发送消息:hello idoall.org syslog
     #在刚才s1机器上打开的kafka消费端,同样可以看到从Flume中发出的信息,说明flume和kafka已经调试成功了。?
1234567891011root@s1:/home/hadoop# /home/hadoop/kafka_2.9.2-0.8.1.1/bin/kafka-console-consumer.sh --zookeeper m1:2181 --topic flume-kafka-storm-001 --from-beginningSLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-Operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.[2014-08-11 14:22:12,165] INFO [ReplicaFetcherManager on broker 3] Removed fetcher for partitions [flume-kafka-storm-001,1] (kafka.server.ReplicaFetcherManager)[2014-08-11 14:22:12,218] WARN [KafkaApi-3] Produce request with correlation id 2 from client on partition [flume-kafka-storm-001,1] failed due to Topic flume-kafka-storm-001 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis)[2014-08-11 14:22:12,223] INFO Completed load of log flume-kafka-storm-001-1 with log end offset 0 (kafka.log.Log)[2014-08-11 14:22:12,250] INFO Created log for partition [flume-kafka-storm-001,1] in /home/hadoop/kafka_2.9.2-0.8.1.1/kafka-logs with properties {segment.index.bytes -> 10485760, file.delete.delay.ms -> 60000, segment.bytes -> 536870912, flush.ms -> 9223372036854775807, delete.retention.ms -> 86400000, index.interval.bytes -> 4096, retention.bytes -> -1, cleanup.policy -> delete, segment.ms -> 604800000, max.message.bytes -> 1000012, flush.messages -> 9223372036854775807, min.cleanable.dirty.ratio -> 0.5, retention.ms -> 604800000}. (kafka.log.LogManager)[2014-08-11 14:22:12,267] WARN Partition [flume-kafka-storm-001,1] on broker 3: No checkpointed highwatermark is found for partition [flume-kafka-storm-001,1] (kafka.cluster.Partition)[2014-08-11 14:22:12,375] INFO Closing socket connection to /192.168.1.50. (kafka.network.Processor)hello idoall.org syslog
    3) kafka和storm的整合     #我们先在eclipse中写代码,在写代码之前,我们要先对maven进行配置,pom.xml配置文件内容如下:?
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253<?xml version="1.0" encoding="utf-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>idoall.cloud</groupId> <artifactId>idoall.cloud</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>idoall.cloud</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <repositories> <repository> <id>github-releases</id> <url>http://oss.sonatype.org/content/repositories/github-releases/</url> </repository> <repository> <id>clojars.org</id> <url>http://clojars.org/repo</url> </repository> </repositories> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <dependency> <groupId>com.sksamuel.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.0-beta1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.14</version> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <version>0.9.0.1</version> <!-- keep storm out of the jar-with-dependencies --><scope>provided</scope> </dependency> <dependency> <groupId>commons-collections</grou
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表