首页 > 编程 > Python > 正文

DataFrame:通过SparkSql将scala类转为DataFrame的方法

2019-11-25 13:22:00
字体:
来源:转载
供稿:网友

如下所示:

import java.text.DecimalFormatimport com.alibaba.fastjson.JSONimport com.donews.data.AppConfigimport com.typesafe.config.ConfigFactoryimport org.apache.spark.sql.types.{StructField, StructType}import org.apache.spark.sql.{Row, SaveMode, DataFrame, SQLContext}import org.apache.spark.{SparkConf, SparkContext}import org.slf4j.LoggerFactory /** * Created by silentwolf on 2016/6/3. */ case class UserTag(SUUID: String,     MAN: Float,     WOMAN: Float,     AGE10_19: Float,     AGE20_29: Float,     AGE30_39: Float,     AGE40_49: Float,     AGE50_59: Float,     GAME: Float,     MOVIE: Float,     MUSIC: Float,     ART: Float,     POLITICS_NEWS: Float,     FINANCIAL: Float,     EDUCATION_TRAINING: Float,     HEALTH_CARE: Float,     TRAVEL: Float,     AUTOMOBILE: Float,     HOUSE_PROPERTY: Float,     CLOTHING_ACCESSORIES: Float,     BEAUTY: Float,     IT: Float,     BABY_PRODUCT: Float,     FOOD_SERVICE: Float,     HOME_FURNISHING: Float,     SPORTS: Float,     OUTDOOR_ACTIVITIES: Float,     MEDICINE: Float     ) object UserTagTable {  val LOG = LoggerFactory.getLogger(UserOverviewFirst.getClass)  val REP_HOME = s"${AppConfig.HDFS_MASTER}/${AppConfig.HDFS_REP}"  def main(args: Array[String]) {  var startTime = System.currentTimeMillis()  val conf: com.typesafe.config.Config = ConfigFactory.load()  val sc = new SparkContext()  val sqlContext = new SQLContext(sc)  var df1: DataFrame = null  if (args.length == 0) {  println("请输入: appkey , StartTime : 2016-04-10 ,StartEnd :2016-04-11") } else {   var appkey = args(0)   var lastdate = args(1)   df1 = loadDataFrame(sqlContext, appkey, "2016-04-10", lastdate)   df1.registerTempTable("suuidTable")   sqlContext.udf.register("taginfo", (a: String) => userTagInfo(a))  sqlContext.udf.register("intToString", (b: Long) => intToString(b))  import sqlContext.implicits._   //***重点***:将临时表中的suuid和自定函数中Json数据,放入UserTag中。 sqlContext.sql(" select distinct(suuid) AS suuid,taginfo(suuid) from suuidTable group by suuid").map { case Row(suuid: String, taginfo: String) =>  val taginfoObj = JSON.parseObject(taginfo)  UserTag(suuid.toString,   taginfoObj.getFloat("man"),   taginfoObj.getFloat("woman"),   taginfoObj.getFloat("age10_19"),   taginfoObj.getFloat("age20_29"),   taginfoObj.getFloat("age30_39"),   taginfoObj.getFloat("age40_49"),   taginfoObj.getFloat("age50_59"),   taginfoObj.getFloat("game"),   taginfoObj.getFloat("movie"),   taginfoObj.getFloat("music"),   taginfoObj.getFloat("art"),   taginfoObj.getFloat("politics_news"),   taginfoObj.getFloat("financial"),   taginfoObj.getFloat("education_training"),   taginfoObj.getFloat("health_care"),   taginfoObj.getFloat("travel"),   taginfoObj.getFloat("automobile"),   taginfoObj.getFloat("house_property"),   taginfoObj.getFloat("clothing_accessories"),   taginfoObj.getFloat("beauty"),   taginfoObj.getFloat("IT"),   taginfoObj.getFloat("baby_Product"),   taginfoObj.getFloat("food_service"),   taginfoObj.getFloat("home_furnishing"),   taginfoObj.getFloat("sports"),   taginfoObj.getFloat("outdoor_activities"),   taginfoObj.getFloat("medicine")  )}.toDF().registerTempTable("resultTable")   val resultDF = sqlContext.sql(s"select '$appkey' AS APPKEY, '$lastdate' AS DATE,SUUID ,MAN,WOMAN,AGE10_19,AGE20_29,AGE30_39 ," +  "AGE40_49 ,AGE50_59,GAME,MOVIE,MUSIC,ART,POLITICS_NEWS,FINANCIAL,EDUCATION_TRAINING,HEALTH_CARE,TRAVEL,AUTOMOBILE," +  "HOUSE_PROPERTY,CLOTHING_ACCESSORIES,BEAUTY,IT,BABY_PRODUCT ,FOOD_SERVICE ,HOME_FURNISHING ,SPORTS ,OUTDOOR_ACTIVITIES ," +  "MEDICINE from resultTable WHERE SUUID IS NOT NULL")  resultDF.write.mode(SaveMode.Overwrite).options(  Map("table" -> "USER_TAGS", "zkUrl" -> conf.getString("Hbase.url"))  ).format("org.apache.phoenix.spark").save()  } }  def intToString(suuid: Long): String = { suuid.toString() }  def userTagInfo(num1: String): String = {  var de = new DecimalFormat("0.00") var mannum = de.format(math.random).toFloat var man = mannum var woman = de.format(1 - mannum).toFloat  var age10_19num = de.format(math.random * 0.2).toFloat var age20_29num = de.format(math.random * 0.2).toFloat var age30_39num = de.format(math.random * 0.2).toFloat var age40_49num = de.format(math.random * 0.2).toFloat  var age10_19 = age10_19num var age20_29 = age20_29num var age30_39 = age30_39num var age40_49 = age40_49num var age50_59 = de.format(1 - age10_19num - age20_29num - age30_39num - age40_49num).toFloat  var game = de.format(math.random * 1).toFloat var movie = de.format(math.random * 1).toFloat var music = de.format(math.random * 1).toFloat var art = de.format(math.random * 1).toFloat var politics_news = de.format(math.random * 1).toFloat  var financial = de.format(math.random * 1).toFloat var education_training = de.format(math.random * 1).toFloat var health_care = de.format(math.random * 1).toFloat var travel = de.format(math.random * 1).toFloat var automobile = de.format(math.random * 1).toFloat  var house_property = de.format(math.random * 1).toFloat var clothing_accessories = de.format(math.random * 1).toFloat var beauty = de.format(math.random * 1).toFloat var IT = de.format(math.random * 1).toFloat var baby_Product = de.format(math.random * 1).toFloat  var food_service = de.format(math.random * 1).toFloat var home_furnishing = de.format(math.random * 1).toFloat var sports = de.format(math.random * 1).toFloat var outdoor_activities = de.format(math.random * 1).toFloat var medicine = de.format(math.random * 1).toFloat  "{" + "/"man/"" + ":" + man + "," + "/"woman/"" + ":" + woman + "," + "/"age10_19/"" + ":" + age10_19 + "," + "/"age20_29/"" + ":" + age20_29 + "," +  "/"age30_39/"" + ":" + age30_39 + "," + "/"age40_49/"" + ":" + age40_49 + "," + "/"age50_59/"" + ":" + age50_59 + "," + "/"game/"" + ":" + game + "," +  "/"movie/"" + ":" + movie + "," + "/"music/"" + ":" + music + "," + "/"art/"" + ":" + art + "," + "/"politics_news/"" + ":" + politics_news + "," +  "/"financial/"" + ":" + financial + "," + "/"education_training/"" + ":" + education_training + "," + "/"health_care/"" + ":" + health_care + "," +  "/"travel/"" + ":" + travel + "," + "/"automobile/"" + ":" + automobile + "," + "/"house_property/"" + ":" + house_property + "," + "/"clothing_accessories/"" + ":" + clothing_accessories + "," +  "/"beauty/"" + ":" + beauty + "," + "/"IT/"" + ":" + IT + "," + "/"baby_Product/"" + ":" + baby_Product + "," + "/"food_service/"" + ":" + food_service + "," +  "/"home_furnishing/"" + ":" + home_furnishing + "," + "/"sports/"" + ":" + sports + "," + "/"outdoor_activities/"" + ":" + outdoor_activities + "," + "/"medicine/"" + ":" + medicine +  "}";  }  def loadDataFrame(ctx: SQLContext, appkey: String, startDay: String, endDay: String): DataFrame = { val path = s"$REP_HOME/appstatistic" ctx.read.parquet(path)  .filter(s"timestamp is not null and appkey='$appkey' and day>='$startDay' and day<='$endDay'") }  }

以上这篇DataFrame:通过SparkSql将scala类转为DataFrame的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表