DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为SparkSQl类型,然而RDD依赖于运行时反射机制。通过上面两点,DataSet的性能比RDD的要好很多DataFrame和DataSet比较Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:1.DataSet可以在编译时检查类型2.是面向对象的编程接口。用Wordcount举例:[Scala] 纯文本查看 复制代码?[Scala] 纯文本查看 复制代码?
0102030405060708091011 //DataFrame
// Load a text file and interPRet each line as a java.lang.String
val
ds
=
sqlContext.read.text(
"/home/spark/1.6/lines"
).as[String]
val
result
=
ds
.flatMap(
_
.split(
" "
))
// Split on whitespace
.filter(
_
!
=
""
)
// Filter empty words
.toDF()
// Convert to DataFrame to perform aggregation / sorting
.groupBy($
"value"
)
// Count number of occurences of each word
.agg(count(
"*"
) as
"numOccurances"
)
.orderBy($
"numOccurances"
desc)
// Show most common words first
3.后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame转化为DataSet,ds.toDF()这样可以把DataSet转化为DataFrame。场景什么时候用RDD?使用RDD的一般场景:你需要使用low-level的transformation和action来控制你的数据集;你得数据集非结构化,比如,流媒体或者文本流;你想使用函数式编程来操作你得数据,而不是用特定领域语言(DSL)表达;你不在乎schema,比如,当通过名字或者列处理(或访问)数据属性不在意列式存储格式;你放弃使用DataFrame和Dataset来优化结构化和半结构化数据集RDD在Apache Spark 2.0中惨遭抛弃?答案当然是 NO !通过后面的描述你会得知:Spark用户可以在RDD,DataFrame和Dataset三种数据集之间无缝转换,而是只需使用超级简单的API方法。什么时候使用DataFrame或者Dataset?你想使用丰富的语义,high-level抽象,和特定领域语言API,那你可DataFrame或者Dataset;你处理的半结构化数据集需要high-level表达, filter,map,aggregation,average,sum ,SQL 查询,列式访问和使用lambda函数,那你可DataFrame或者Dataset;你想利用编译时高度的type-safety,Catalyst优化和Tungsten的code生成,那你可DataFrame或者Dataset;你想统一和简化API使用跨Spark的Library,那你可DataFrame或者Dataset;如果你是一个R使用者,那你可DataFrame或者Dataset;如果你是一个Python使用者,那你可DataFrame或者Dataset;你可以无缝的把DataFrame或者Dataset转化成一个RDD,只需简单的调用 .rdd:[Scala] 纯文本查看 复制代码?
1234567 //DataSet,完全使用scala编程,不要切换到DataFrame
val
wordCount
=
ds.flatMap(
_
.split(
" "
))
.filter(
_
!
=
""
)
.groupBy(
_
.toLowerCase())
// Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
12345678 // select specific fields from the Dataset, apply a predicate
// using the where() method, convert to an RDD, and show first 10
// RDD rows
val
deviceEventsDS
=
ds.select($
"device_name"
, $
"cca3"
, $
"c02_level"
).where($
"c02_level"
>
1300
)
// convert to RDDs and take the first 10 rows
val
eventsRDD
=
deviceEventsDS.rdd.take(
10
)
新闻热点
疑难解答