博客
关于我
Spark SQL项目流程分析总结
阅读量:149 次
发布时间:2019-02-27

本文共 4349 字,大约阅读时间需要 14 分钟。

使用properties文件存储配置信息:my.properties

在本项目中,我们采用properties文件来存储配置信息,这样可以有效避免手动输入错误并提高配置管理的统一性。以下是常用配置项:

spark.local=true&local.test.log.path=D:/JavaProjects/ClassStudy/Scala/express/files/test.log

MySQL连接配置:

jdbc.url=jdbc:mysql://192.168.221.140:3306/kdcjdbc.user=rootjdbc.password=kb10

全局变量定义:Constants

为了确保配置信息的可维护性,我们定义了常量类Constants,用于从properties文件中读取配置值:

val SPARK_LOCAL = "spark.local"//本地测试日志的路径val LOCAL_TEST_LOG_PATH = "local.test.log.path"val JDBC_URL = "jdbc.url"val JDBC_USER = "jdbc.user"val JDBC_PASSWORD = "jdbc.password"

加载配置文件:ConfigurationManager

我们创建了ConfigurationManager类来加载properties文件并提供读取配置值的方法:

private val props = new Properties()try {     props.load(new FileInputStream("D:\\JavaProjects\\ClassStudy\\Scala\\express\\src\\main\\resources\\my.properties"))} catch {     case e:IOException => e.printStackTrace()}

此外,我们还提供了根据配置值类型返回不同的数据类型的方法:

def getBoolean(key:String):Boolean={     val value: String = props.getProperty(key)  try {       return value.toBoolean  } catch {       case e:Exception => e.printStackTrace()  }  false}def getString(key:String):String={     props.getProperty(key)}def getInteger(key:String):Int={     0}

工具类

4.1、spark配置工具类01:SparkUtils

SparkUtils类用于处理spark配置,根据配置文件中的spark.local值决定是本地模式还是集群模式:

def getSparkConf(): SparkConf = {     var conf: SparkConf = null  if (ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)) {       conf = new SparkConf()      .setMaster("local[4]")      .setAppName(this.getClass.getName())  } else {       conf = new SparkConf()      .setAppName(this.getClass.getName())  }  conf}

此外,还提供了创建SparkSession的方法:

def getSparkSession():SparkSession={     SparkSession.builder()    .config(getSparkConf())    .getOrCreate()}

4.2、DataFrame的Schema工具类:SchemaUtils

SchemaUtils类用于定义数据框架的结构,用于表的清洗和处理:

val access_logs_schema = StructType(Array(  StructField("event_time", StringType, true),  StructField("url", StringType, true))val full_access_logs_schema = StructType(Array(    StructField("event_time", StringType, true),    StructField("userSID", StringType, true))

4.3、数据库连接的工具类:DBUtils

DBUtils类用于获取数据库连接信息并处理配置:

def getDBProps():Tuple2[String,Properties]={     // 定义Mysql的url  val url: String = ConfigurationManager.getString(Constants.JDBC_URL)  // 定义Mysql的配置  val props = new Properties()  // 向配置中添加用户名和密码  props.put("user",ConfigurationManager.getString(Constants.JDBC_USER))  props.put("password",ConfigurationManager.getString(Constants.JDBC_PASSWORD))  (url,props)}

数据清洗类

5.1、数据清洗

数据清洗类主要负责对日志数据进行预处理和清洗,支持通过命令行参数指定日志路径:

var logPath = ""if (args.length == 1) {     logPath = args(0)} else {     logPath = ConfigurationManager.getString(Constants.LOCAL_TEST_LOG_PATH)}

清洗流程包括文件读取、格式校验、去重和空值处理等:

  • 读文件时,若文件格式不规范,使用filter算子筛选保留固定长度的数据
  • 使用dropDuplicates(col1,col2…)对重复数据进行去重
  • 使用StringUtils.isNotEmpty(str)方法过滤掉空值
  • 对Row类型的RDD通过getAs方法获取某一列的数据并做处理
  • 首次清洗分列后,可以根据需求对某列数据再次分列处理

val rowRDD: RDD[Row] = spark.sparkContext.textFile(logPath)      .map(_.split("\t"))      .filter(_.length == 8)      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))	//首次全局清洗	val filteredLog: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")      .filter(x => x(3) == "200")      .filter(x => StringUtils.isNotEmpty(x(0).toString))   	 //第二次数据清洗_val fullAccessLogRDD = filteredLog.map(line => {         val params: Array[String] = line.getAs[String]("url")        .split("\\?")      var paramsMap: Map[String, String] = Map.empty      if (params.length == 2) {           paramsMap = params(1)          .split("&")          .map(_.split("="))          .filter(_.length == 2)          .map(x => (x(0), x(1)))          .toMap      }      //第三次细节清洗

5.2、数据导入MySQL表中

数据导入MySQL表中支持两种模式:append和overwrite。同时支持日志记录:

import org.apache.log4j.Logger val logger: Logger = Logger.getLogger(this.getClass)

数据分析类

6.1、SparkSession创建

数据分析类主要负责从数据库读取数据进行分析并存储结果:

//1、SparkSession创建    val spark = SparkUtils.getSparkSession()    import spark.implicits._    import org.apache.spark.sql.functions._    //2、获取数据库配置    val (url,props) = DBUtils.getDBProps()    //3、读取输入表、输出表    val inputTableName = Constants.KDC_FULL_ACCESS_LOGS_TABLE_NAME    val outputTableName = Constants.KDC_ACTIVE_USER_STATICS    //4、从MySQL中读取DF数据    val full_access_logs = spark.read.jdbc(url,inputTableName,props)    //5、计算、生成newDF结果数据        //6、把结果数据存到MySQL    newDF.write.mode("overwrite or append").jdbc(url,outputTableName,props)

转载地址:http://vwqb.baihongyu.com/

你可能感兴趣的文章
Objective-C实现anagrams字谜算法(附完整源码)
查看>>
Objective-C实现ApproximationMonteCarlo蒙特卡洛方法计算pi值算法 (附完整源码)
查看>>
Objective-C实现area under curve曲线下面积算法(附完整源码)
查看>>
Objective-C实现arithmetic算术算法(附完整源码)
查看>>
Objective-C实现armstrong numbers阿姆斯壮数算法(附完整源码)
查看>>
Objective-C实现articulation-points(关键点)(割点)算法(附完整源码)
查看>>
Objective-C实现atoi函数功能(附完整源码)
查看>>
Objective-C实现average absolute deviation平均绝对偏差算法(附完整源码)
查看>>
Objective-C实现average mean平均数算法(附完整源码)
查看>>
Objective-C实现average median平均中位数算法(附完整源码)
查看>>
Objective-C实现average mode平均模式算法(附完整源码)
查看>>
Objective-C实现avl 树算法(附完整源码)
查看>>
Objective-C实现AvlTree树算法(附完整源码)
查看>>
Objective-C实现backtracking Jump Game回溯跳跃游戏算法(附完整源码)
查看>>
Objective-C实现BACKTRACKING 方法查找集合的幂集算法(附完整源码)
查看>>
Objective-C实现bailey borwein plouffe算法(附完整源码)
查看>>
Objective-C实现base64加密和base64解密算法(附完整源码)
查看>>
Objective-C实现base64加解密(附完整源码)
查看>>
Objective-C实现base64编码 (附完整源码)
查看>>
Objective-C实现base85 编码算法(附完整源码)
查看>>