博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark SQL项目流程分析总结
阅读量:144 次
发布时间:2019-02-27

本文共 4491 字,大约阅读时间需要 14 分钟。

文章目录

1、使用properties文件存储配置信息:my.properties

  • 是否是本地模式
  • 源数据存储路径
spark.local=truelocal.test.log.path=D:/JavaProjects/ClassStudy/Scala/express/files/test.log
  • MySQL连接配置:URL;user;password
jdbc.url=jdbc:mysql://192.168.221.140:3306/kdcjdbc.user=rootjdbc.password=kb10

2、全局变量定义:Constants

  • properties里的value值,防止手动输入错误
val SPARK_LOCAL = "spark.local"//本地测试日志的路径val LOCAL_TEST_LOG__PATH = "local.test.log.path"val JDBC_URL = "jdbc.url"val JDBC_USER = "jdbc.user"val JDBC_PASSWORD = "jdbc.password"

3、加载配置文件:ConfigurationManager

  • 使用properties的load(new InputStream)方法读取路径信息。trycatch一下
private val props = new Properties()// 加载配置文件try {
props.load(new FileInputStream("D:\\JavaProjects\\ClassStudy\\Scala\\express\\src\\main\\resources\\my.properties"))} catch {
case e:IOException => e.printStackTrace()}
  • 对外提供一些类型转换的方法,根据配置文件中的value数据类型不同,返回不同的数据类型。比如说toBoolean
def getBoolean(key:String):Boolean={
val value: String = props.getProperty(key) try {
return value.toBoolean } catch {
case e:Exception => e.printStackTrace() } false}def getString(key:String):String={
props.getProperty(key)}def getInteger(key:String):Int={
0}
  • getProperty(key)方法获取properties的value值

4、工具类

4.1、spark配置工具类01:SparkUtils

  • 如果是本地模式,则setMaster(“local[*]”);如果是集群模式,则则不用设置Master
def getSparkConf(): SparkConf = {
var conf: SparkConf = null if (ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)) {
conf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName()) } else {
conf = new SparkConf() .setAppName(this.getClass.getName()) } conf}
  • 创建SparkSession
def getSparkSession():SparkSession={
SparkSession.builder() .config(getSparkConf()) .getOrCreate()}

4.2、DataFrame的Schema工具类:SchemaUtils

  • 对表的首次清洗和后续清洗所用到的StructType汇总
val access_logs_schema = StructType(Array(  StructField("event_time", StringType, true),  StructField("url", StringType, true))val full_access_logs_schema = StructType(Array(    StructField("event_time", StringType, true),    StructField("userSID", StringType, true))

4.3、数据库连接的工具类:DBUtils

  • 获取数据库链接、用户、密码的类
def getDBProps():Tuple2[String,Properties]={
// 定义Mysql的url val url: String = ConfigurationManager.getString(Constants.JDBC_URL) // 定义Mysql的配置 val props = new Properties() // 向配置中添加用户名和密码 props.put("user",ConfigurationManager.getString(Constants.JDBC_USER)) props.put("password",ConfigurationManager.getString(Constants.JDBC_PASSWORD)) (url,props)}

5、数据清洗类

5.1、数据清洗

  • 可设置args入口参数选项,供jar包使用
var logPath = ""if (args.length == 1) {
logPath = args(0)} else {
logPath = ConfigurationManager.getString(Constants.LOCAL_TEST_LOG__PATH)}
  • 读文件时,若文件格式不规范,使用filter算子筛选保留固定长度的数据
  • 使用dropDuplicates(col1,col2…)对重复数据进行去重
  • 使用StringUtils.isNotEmpty(str)方法过滤掉空值
  • 对Row类型的RDD通过getAs方法获取某一列的数据并做处理
  • 首次清洗分列后,可以根据需求对某列数据再次分列处理(参考以下代码的多次清洗)
val rowRDD: RDD[Row] = spark.sparkContext.textFile(logPath)      .map(_.split("\t"))      .filter(_.length == 8)      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))	//首次全局清洗	val filteredLog: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")      .filter(x => x(3) == "200")      .filter(x => StringUtils.isNotEmpty(x(0).toString))   	 //第二次数据清洗	val fullAccessLogRDD = filteredLog.map(line => {
val params: Array[String] = line.getAs[String]("url") .split("\\?") var paramsMap: Map[String, String] = Map.empty if (params.length == 2) {
paramsMap = params(1) .split("&") .map(_.split("=")) .filter(_.length == 2) .map(x => (x(0), x(1))) .toMap } //第三次细节清洗

5.2、数据导入MySQL表中

  • 数据save模式有append和overwrite两种
  • 使用logger.info(s"")方法输出日志信息。需要导包。
import org.apache.log4j.Logger val logger: Logger = Logger.getLogger(this.getClass)

6、数据分析类

  • SparkSession创建
  • 获取数据库配置
  • 定义输入表、输出表
  • 从MySQL中读取DF数据:spark.read.jdbc(url,inputTableName,props)
  • 计算、生成newDF结果数据
  • 把结果数据存到MySQL:newDF.write.mode(“overwrite or append”).jdbc(url,outputTableName,props)
//1、SparkSession创建    val spark = SparkUtils.getSparkSession()    import spark.implicits._    import org.apache.spark.sql.functions._    //2、获取数据库配置    val (url,props) = DBUtils.getDBProps()    //3、读取输入表、输出表    val inputTableName = Constants.KDC_FULL_ACCESS_LOGS_TABLE_NAME    val outputTableName = Constants.KDC_ACTIVE_USER_STATICS    //4、从MySQL中读取DF数据    val full_access_logs = spark.read.jdbc(url,inputTableName,props)    //5、计算、生成newDF结果数据        //6、把结果数据存到MySQL    newDF.write.mode("overwrite or append").jdbc(url,outputTableName,props)

转载地址:http://vwqb.baihongyu.com/

你可能感兴趣的文章