首页 > 学院 > 开发设计 > 正文

Spark实例

2019-11-06 09:17:24
字体:
来源:转载
供稿:网友

1、入门

object HelloSpark { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple application").setMaster("local") val sc = new SparkContext(conf) sc.addJar("C://Users//asus//IdeaPRojects//HelloSpark//target//HelloSpark-1.0-SNAPSHOT.jar") val res = sc.textFile("D://info.log").map(line => { val f = line.split("/t") (f(1),1) }).reduceByKey(_+_).map(x => { val host = new URL(x._1).getHost (host,x._2) }).filter(_._1 == "java").sortBy(_._2,false).saveAsTextFile("D://out2") //println(res)D sc.stop() }}

2、Parallelize

object H{ def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple Application").setMaster("local") val sc = new SparkContext(conf) val a = sc.parallelize(List(( "A",90,27 ),( "B",91,22 ),( "C",90,25 ))).sortBy(x => Girl(x._2,x._3),false).collect().toBuffer println(a) }}case class Girl(val a:Int,val b:Int) extends Ordered[Girl] with Serializable{ override def compare(that: Girl): Int = { if(this.a == that.a){ that.b - this.b } else { this.a - that.a } }}

3、Spark Streaming(Socket)

object SocketSparkStreaming{ val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) //iter.map{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))} iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) } def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val sc = new SparkContext(conf) sc.setCheckpointDir("d://ck") val ssc = new StreamingContext(sc,Seconds(5)) val rdd = ssc.socketTextStream("192.168.1.101",8888) val res = rdd.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc,new HashPartitioner(sc.defaultParallelism),true) res.print() ssc.start() ssc.awaitTermination() }}

4、Spark Streaming(Flume)

object FlumeSparkStreaming{ def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val sc = new SparkContext(conf) sc.setCheckpointDir("d://ck") val ssc = new StreamingContext(sc,Seconds(5)) val address = Seq(new InetSocketAddress("192.168.1.101",8888)) val flumeStream = FlumeUtils.createPollingStream(ssc,address,StorageLevel.MEMORY_AND_DISK_SER) val Words = flumeStream.flatMap(x => new String(x.event.getBody.array()).split(" ")).map((_,1)) val result = words.reduceByKey(_+_) result.print() ssc.start() ssc.awaitTermination() }}

5、Kafka WordCount

object KafkaWordCount{ val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) //iter.map{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))} iter.map(t => (t._1, t._2.sum + t._3.getOrElse(0))) } def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val Array(zkQuorum,group,topics,numThreads) = args val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("d://ckkafka") val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap val result = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap).map(_._2) .flatMap(_.split(" ")).map((_,1)) .updateStateByKey(updateFunc,new HashPartitioner( ssc.sparkContext.defaultParallelism),rememberPartitioner = true) result.print() ssc.start() ssc.awaitTermination() }}

6、窗口函数

object WindowFunc{ def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("Spark Streaming").setMaster("local[2]") val ssc = new StreamingContext(conf,Seconds(5)) ssc.checkpoint("d://ck") val lines = ssc.socketTextStream("192.168.1.101",8888) val result = lines.flatMap(_.split(" ")).map((_,1)).reduceByKeyAndWindow( (a:Int,b:Int) => (a+b), Seconds(15), Seconds(5) ) result.print() ssc.start() ssc.awaitTermination() }}
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表