DStream内部主要结构如下所示:abstract class DStream[T: ClassManifest] ( @transient protected[streaming] var ssc: StreamingContext ) extends Serializable with Logging { initLogging() // ======================================================================= // Methods that should be implemented by subclasses of DStream // ======================================================================= /** Time interval after which the DStream generates a RDD */ def slideDuration: Duration /** List of parent DStreams on which this DStream depends on */ def dependencies: List[DStream[_]] /** Method that generates a RDD for the given time */ /** DStream的核心函数,每一个继承于此的子类都需要实现此compute()函数。而根据不同的 DStream, compute()函数都需要实现其特定功能,而计算的结果则是返回计算好的RDD*/ def compute (validTime: Time): Option[RDD[T]] // ======================================================================= // Methods and fields available on all DStreams // ======================================================================= // RDDs generated, marked as protected[streaming] so that testsuites can access it /** 每一个DStream内部维护的RDD HashMap,DStream本质上封装了一组以Time为key的RDD,而对于 DStream的各种操作在内部映射为对RDD的操作 */ @transient protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] () // Time zero for the DStream protected[streaming] var zeroTime: Time = null // Duration for which the DStream will remember each RDD created protected[streaming] var rememberDuration: Duration = null // Storage level of the RDDs in the stream protected[streaming] var storageLevel: StorageLevel = StorageLevel.NONE // Checkpoint details protected[streaming] val mustCheckpoint = false protected[streaming] var checkpointDuration: Duration = null protected[streaming] val checkpointData = new DStreamCheckpointData(this) // Reference to whole DStream graph /** 所有的DStream都注册到DStreamGraph中,调用DStreamGraph来执行所有的DStream和所有的dependencies */ protected[streaming] var graph: DStreamGraph = null protected[streaming] def isInitialized = (zeroTime != null) // Duration for which the DStream requires its parent DStream to remember each RDD created protected[streaming] def parentRememberDuration = rememberDuration ...复制代码DStream在内部维护了一组时间序列的RDD,对于DStream的transformation和output在内部都转化为对于RDD的transformation和output。下面来看一下对于DStream的计算是如何映射到对于RDD的计算上去的。protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { // If this DStream was not initialized (i.e., zeroTime not set), then do it // If RDD was already generated, then retrieve it from HashMap generatedRDDs.get(time) match { // If an RDD was already generated and is being reused, then // probably all RDDs in this DStream will be reused and hence should be cached case Some(oldRDD) => Some(oldRDD) // if RDD was not generated, and if the time is valid // (based on sliding time of this DStream), then generate the RDD case None => { if (isTimeValid(time)) { /** 对于每一次的计算,DStream会调用子类所实现的compute()函数来计算产生新的RDD */ compute(time) match { case Some(newRDD) => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logInfo("Persisting RDD " + newRDD.id + " for time " + time + " to " + storageLevel + " at time " + time) } if (checkpointDuration != null && (time - zeroTime).isMultipleOf (checkpointDuration)) { newRDD.checkpoint() logInfo("Marking RDD " + newRDD.id + " for time " + time + " for checkpointing at time " + time) } /** 新产生的RDD会放入Hash Map中 */ generatedRDDs.put(time, newRDD) Some(newRDD) case None => None } } else { None } } }}复制代码通过每次提交的job,调用getOrCompute()来计算:protected[streaming] def generateJob(time: Time): Option[Job] = {getOrCompute(time) match { case Some(rdd) => { val jobFunc = () => { val emptyFunc = { (iterator: Iterator[T]) => {} } context.sparkContext.runJob(rdd, emptyFunc) } Some(new Job(time, jobFunc)) } case None => None }}复制代码Job & Scheduler从DStream可知,在调用generateJob()时,DStream会通过getOrCompute()函数来计算或是转换DStream,那么Spark Streaming会在何时调用generateJob()呢?在实例化StreamingContext时,StreamingContext会要求用户设置batchDuration,而batchDuration则指明了recurring job的重复时间,在每个batchDuration到来时都会产生一个新的job来计算DStream,从Scheduler的代码里可以看到:val clockClass = System.getProperty("spark.streaming.clock", "spark.streaming.util.SystemClock")val clock = Class.forName(clockClass).newInstance().asInstanceOf[Clock]/** Spark streaming在Scheduler内部创建了recurring timer,recurring timer的超时时间 则是用户设置的batchDuration,在超时后调用Scheduler的generateJob */val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,longTime => generateJobs(new Time(longTime)))generateJobs()的代码如下所示,Scheduler的generateJobs()会调用DStreamGraph的generateJobs,并对于每一个job使用JobManager来run job。def generateJobs(time: Time) { SparkEnv.set(ssc.env) logInfo("/n-----------------------------------------------------/n") graph.generateJobs(time).foreach(jobManager.runJob) latestTime = time doCheckpoint(time)}复制代码在DStreamGraph中,generateJobs()如下所示:def generateJobs(time: Time): Seq[Job] = { this.synchronized { logInfo("Generating jobs for time " + time) val jobs = outputStreams.flatMap(outputStream => outputStream.generateJob(time)) logInfo("Generated " + jobs.length + " jobs for time " + time) jobs }}复制代码对于每一个outputStream调用generateJob()来转换或计算DStream,output的计算会依赖于dependecy的计算,因此最后会对所有dependency都进行计算,得出最后的outputStream。而所有的这些操作,都在调用StreamingContext的启动函数后进行执行。def start() { if (checkpointDir != null && checkpointDuration == null && graph != null) { checkpointDuration = graph.batchDuration } validate() /** StreamingContext注册和启动所有的input stream */ val networkInputStreams = graph.getInputStreams().filter(s => s match { case n: NetworkInputDStream[_] => true case _ => false }).map(_.asInstanceOf[NetworkInputDStream[_]]).toArray if (networkInputStreams.length > 0) { // Start the network input tracker (must start before receivers) networkInputTracker = new NetworkInputTracker(this, networkInputStreams) networkInputTracker.start() } Thread.sleep(1000) // 启动scheduler进行streaming的操作 scheduler = new Scheduler(this) scheduler.start()}复制代码至此,对于Spark Streaming的使用和内部结构应该有了一个基本的了解,以一副Spark Streaming启动后的流程图来结束这篇文章。
新闻热点
疑难解答