文章发自:http://www.VEVb.com/hark0623/p/4170156.html 转发请注明
如何做集成,其实特别简单,网上其实就是教程。
http://blog.csdn.net/fighting_one_piece/article/details/40667035 看这里就成。 我用的是第一种集成。。做的时候,出现了各种问题。 大概从从2014.12.17 早晨5点搞到2014.12.17晚上18点30总结起来其实很简单,但做的时候搞了许久啊啊啊!!!! 这样的事情,吃一堑长一智吧问题1、 需要引用各种包,这些包要打入你的JAR中, 因为用的是spark on yarn模式,所以如果不打进去,在集群中是找不到依赖包的!!! 去哪找呢? 直接去search.maven.org找。。package com.harkimport java.io.Fileimport org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming.flume.FlumeUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.streaming.StreamingContext._/** * Created by Administrator on 2014-12-16. */object SparkStreamingFlumeTest { def main(args: Array[String]) { //PRintln("harkhark") val path = new File(".").getCanonicalPath() //File workaround = new File("."); System.getProperties().put("hadoop.home.dir", path); new File("./bin").mkdirs(); new File("./bin/winutils.exe").createNewFile(); //val sparkConf = new SparkConf().setAppName("HdfsWordCount").setMaster("local[2]") val sparkConf = new SparkConf().setAppName("HdfsWordCount") // Create the context val ssc = new StreamingContext(sparkConf, Seconds(20)) //val hostname = "127.0.0.1" val hostname = "localhost" val port = 2345 val storageLevel = StorageLevel.MEMORY_ONLY val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel) flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print() ssc.start() ssc.awaitTermination() }}
flume配置文件如下:
# Please paste flume.conf here. Example:# Sources, channels, and sinks are defined per# agent name, in this case 'tier1'.tier1.sources = source1tier1.channels = channel1tier1.sinks = sink1# For each source, channel, and sink, set# standard properties.tier1.sources.source1.type = exectier1.sources.source1.command = tail -F /opt/data/test3/123tier1.sources.source1.channels = channel1tier1.channels.channel1.type = memory#tier1.sinks.sink1.type = loggertier1.sinks.sink1.type = avrotier1.sinks.sink1.hostname = localhosttier1.sinks.sink1.port = 2345tier1.sinks.sink1.channel = channel1# Other properties are specific to each type of yhx.hadoop.dn01# source, channel, or sink. In this case, we# specify the capacity of the memory channel.tier1.channels.channel1.capacity = 100spark启动命令如下:
spark-submit --driver-memory 512m --executor-memory 512m --executor-cores 1 --num-executors 3 --class com.hark.SparkStreamingFlumeTest --deploy-mode cluster --master yarn /opt/spark/SparkTest.jarflume启动命令如下:
flume-ng agent --conf /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT --conf-file /opt/cloudera-manager/run/cloudera-scm-agent/process/585-flume-AGENT/flume.conf --name tier1 -Dflume.root.logger=INFO,console
新闻热点
疑难解答