博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark操作总结
阅读量:7040 次
发布时间:2019-06-28

本文共 3454 字,大约阅读时间需要 11 分钟。

一、sparkContext与sparkSession区别

任何Spark程序都是SparkContext开始的,SparkContext的初始化需要一个SparkConf对象,SparkConf包含了Spark集群配置的各种参数,sparkContext只能在driver机器上面启动;

SparkSession: SparkSession实质上是SQLContext和HiveContext的组合,SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成

val conf: SparkConf = new SparkConf().setAppName("test")val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()

 

二、repartition与coalesce区别

repartition一般是用来增加分区数(当然也可以减少),coalesce只能用来减少分区数。所以如果不介意保存的文件块大小不一样,可以使用coalesce来减少分区数,保存的时候一个分区就会生成一个文件块

 

三、Scala常用方法

1. StringBuilder

主要用于字符串的拼接,可作用于生成倒排序列,如:val userItemScore = sc.parallelize(List((1001, 1, 0.8), (1001, 2, 0.7), (1001, 3, 0.5), (1001, 4, 0.9)))userItemScore.map(x => (x._1, (x._2.toString, x._3.toString))).groupByKey().map{x =>val userid = x._1val item_score_list = x._2val tmp_arr = item_score_list.toArray.sortWith(_._2 > _._2)val watch_len = tmp_arr.lengthval strbuf = new StringBuilder()for (i <- 0 until watch_len - 1) {strbuf ++= tmp_arr(i)._1strbuf.append(":")strbuf ++= tmp_arr(i)._2strbuf.append(" ")}strbuf ++= tmp_arr(watch_len - 1)._1strbuf.append(":")strbuf ++= tmp_arr(watch_len - 1)._2userid + "\t" + strbuf}.collect()

 

2. scala.collection.mutable.ArrayBuffer

相当于是一个大小可变数组,把需要的值添加进来,例如:val tmpArray = new ArrayBuffer[String]()val tmpArray = new ArrayBuffer[Int]()val tmpArray = new ArrayBuffer[(String, Int)]()scala> tmpArray.append(("wangzai", 1))scala> tmpArrayres11: scala.collection.mutable.ArrayBuffer[(String, Int)] = ArrayBuffer((wangzai,1), (test,2))tmpArray.indexOf(("test",2))为获取当前值的索引,返回类型为整型tmpArray.slice(tmpArray.indexOf(("test", 2)), tmpArray.length)为切片,返回类型为ArrayBuffer

 

四、通过spark-shell来操作数据库中的表

1 启动(通过--jars指定包,后面reids包不需要,只是演示添加多个包的用法)

/xxx/spark/bin/spark-shell \--master spark://xxx:7077 \--executor-cores 1 \--total-executor-cores 5 \--driver-memory 2g \--jars /xxx/jars/mysql-connector-java-5.1.38.jar,/xxx/jars/jedis-2.9.0.jar

2 在命令行中输入::paste, 然后粘贴以下代码,最后ctrl+D退出之后,即可执行

 

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf
val conf: SparkConf = new SparkConf()
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val mysqlUrl: String = "jdbc:mysql://ip:port/database?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
val productTable: String = "product_info"
val orderTable: String = "order_info"
val properties: Properties = new Properties()
properties.put("user", user)
properties.put("password", password)

 

// 获取同事购配置表数据

val productDF: DataFrame = spark.read.jdbc(mysqlUrl, productTable, properties).select("id", "name")
val orderDF: DataFrame = spark.read.jdbc(mysqlUrl, orderTable, properties).select("product_id", "createTime")

 

val totalDataDF = productDF.join(orderDF, orderDF("product_id") === productDF("id")).drop("id")

//如果product_info对应的id为product_id,即关联id字段名不相同
//val totalDataDF = productDF.join(orderDF, Seq("product_id"))

3 把该DateFrame注册为临时表才能通过spark-sql操作

totalDataDF.createOrReplaceTempView("totalDataDF")

 

五、spark-sql的基本操作

//默认显示20条数据scala> df.show()//打印模式信息scala> df.printSchema()//选择多列scala> df.select(df("name"),df("age")+1).show()// 条件过滤scala> df.filter(df("age") > 20 ).show()// 分组聚合scala> df.groupBy("age").count().show()// 排序scala> df.sort(df("age").desc).show()//多列排序scala> df.sort(df("age").desc, df("name").asc).show()//对列进行重命名scala> df.select(df("name").as("username"),df("age")).show()

 

 

转载于:https://www.cnblogs.com/654wangzai321/p/11096992.html

你可能感兴趣的文章
NEC面部识别系统助力台北世界大学生运动会
查看>>
nfs
查看>>
UltraEdit实现“删除包含某个关键字的所有行”
查看>>
WSFC 维护模式操作粒度控制
查看>>
linux kill 命令
查看>>
为什么使用useLegacyV2RuntimeActivationPolicy?
查看>>
Shell工作笔记01
查看>>
windows 2008 R2搭建简单WEB服务器
查看>>
hyper-v故障转移群集之4、创建群集
查看>>
webpack命令行
查看>>
多网卡的7种bond模式原理
查看>>
用update和replace在sql中替换某一个字段的部分内容
查看>>
Web框架原理
查看>>
HEX解码
查看>>
.pyc是什么鬼?
查看>>
golang 详解defer
查看>>
流程控制-for序列、流程控制-for字典
查看>>
Go语言之反射
查看>>
dTree JS 基本用法
查看>>
Android Things创客DIY第一课-用Android Things展示你的智能设备创意-基础篇
查看>>