SparkSql初级编程实践


1.Spark SQL 基本操作
将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json。
{ "id":1 , "name":" Ella" , "age":36 }
{ "id":2, "name":"Bob","age":29 }
{ "id":3 , "name":"Jack","age":29 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":4 , "name":"Jim","age":28 }
{ "id":5 , "name":"Damon" }
{ "id":5 , "name":"Damon" }
为 employee.json 创建 DataFrame,并写出 Scala 语句完成下列操作:

(1)查询所有数据;

(2)查询所有数据,并去除重复的数据;

(3)查询所有数据,打印时去除 id 字段;

(4)筛选出 age>30 的记录;

(5)将数据按 age 分组;

(6)将数据按 name 升序排列;

(7)取出前 3 行数据

(8)查询所有记录的 name 列,并为其取别名为 username

(9)查询年龄 age 的平均值;

(10) 查询年龄 age 的最小值。

2.编程实现将 RDD 转换为 DataFrame

源文件内容如下(包含 id,name,age):
1,Ella,36
2,Bob,29
3,Jack,29
请将数据复制保存到 Linux 系统中,命名为 employee.txt,实现从 RDD 转换得到
DataFrame,并按“id:1,name:Ella,age:36”的格式打印出 DataFrame 的所有数据。请写出程序代
码。

package cn.spark.study.sy5

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}


object Testsql {
  def main(args: Array[String]) {
       val conf = new SparkConf()
       conf.setMaster("local")
           .setAppName("Testsql")
       val sc = new SparkContext(conf)
       val sqlContext = new SQLContext(sc)
       //hdfs://192.168.6.134:9000/nlc/1.txt
       //H:\文件\数据集
       val studentRDD = sc.textFile("D:\\myDevelopTools\\Intellij IDEA\\workplace\\spark-study-scala\\src\\main\\java\\cn\\spark\\study\\sy5\\employee.txt", 1)
      .map { line => Row(line.split(",")(0), line.split(",")(1), line.split(",")(2)) }
        // 第二步,编程方式动态构造元数据
      val structType = StructType(Array(
          StructField("id", StringType, true),
          StructField("name", StringType, true),
          StructField("age", StringType, true)))
      // 第三步,进行RDD到DataFrame的转换
      val studentDF = sqlContext.createDataFrame(studentRDD, structType)
      // 继续正常使用
      studentDF.registerTempTable("employee")
//      val teenagerDF = sqlContext.sql("select usrid,count(usrid) from students group by usrid order by usrid")
      val teenagerDF = sqlContext.sql("select id,name,age from employee")
      val teenagerRDD = teenagerDF.rdd.collect().foreach { row => println("id:"+row(0)+",name:"+row(1)+",age:"+row(2)) }
    }
}

3. 编程实现利用 DataFrame 读写 MySQL 的数据
(1)在 MySQL 数据库中新建数据库 sparktest,再创建表 employee,包含如表 6-2 所示的
两行数据。
表 6-2 employee 表原有数据
id name gender Age
1 Alice F 22
2 John M 25
(2)配置 Spark 通过 JDBC 连接数据库 MySQL,编程实现利用 DataFrame 插入如表 6-3 所
示的两行数据到 MySQL 中,最后打印出 age 的最大值和 age 的总和。
表 6-3 employee 表新增数据
id name gender age
3 Mary F   26
4 Tom M   23

package cn.spark.study.sy5
import java.util.Properties

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
/**
  * Created by Lenovo on 2019/3/27.
  */
object TestMySQL {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("Testsql")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    val employeeRDD = sqlContext.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
    val schema = StructType(List(StructField("id", IntegerType,
      true),StructField("name", StringType, true),StructField("gender", StringType,
      true),StructField("age", IntegerType, true)))
    val rowRDD = employeeRDD.map(p => Row(p(0).toInt,p(1),
      p(2),p(3).toInt))
    val employeeDF = sqlContext.createDataFrame(rowRDD, schema)
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "123123")
    prop.put("driver","com.mysql.jdbc.Driver")
    employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest", "sparktest.employee", prop)
      val jdbcDF = sqlContext.read.format("jdbc").option("url",
      "jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "123123").load()
      jdbcDF.agg("age" -> "max", "age" -> "sum")
      }

}

优质内容筛选与推荐>>
1、noip 2005 luogu cogs P1052 过河 WD
2、redis 安装
3、浅析HBase region的单点问题
4、Dubbo项目demo搭建
5、哦


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号





    联系我们

    欢迎来到TinyMind。

    关于TinyMind的内容或商务合作、网站建议,举报不良信息等均可联系我们。

    TinyMind客服邮箱:support@tinymind.net.cn