博客
关于我
Spark SQL项目流程分析总结
阅读量:149 次
发布时间:2019-02-27

本文共 4475 字,大约阅读时间需要 14 分钟。

文章目录

1、使用properties文件存储配置信息:my.properties

  • 是否是本地模式
  • 源数据存储路径
spark.local=truelocal.test.log.path=D:/JavaProjects/ClassStudy/Scala/express/files/test.log
  • MySQL连接配置:URL;user;password
jdbc.url=jdbc:mysql://192.168.221.140:3306/kdcjdbc.user=rootjdbc.password=kb10

2、全局变量定义:Constants

  • properties里的value值,防止手动输入错误
val SPARK_LOCAL = "spark.local"//本地测试日志的路径val LOCAL_TEST_LOG__PATH = "local.test.log.path"val JDBC_URL = "jdbc.url"val JDBC_USER = "jdbc.user"val JDBC_PASSWORD = "jdbc.password"

3、加载配置文件:ConfigurationManager

  • 使用properties的load(new InputStream)方法读取路径信息。trycatch一下
private val props = new Properties()// 加载配置文件try {     props.load(new FileInputStream("D:\\JavaProjects\\ClassStudy\\Scala\\express\\src\\main\\resources\\my.properties"))} catch {     case e:IOException => e.printStackTrace()}
  • 对外提供一些类型转换的方法,根据配置文件中的value数据类型不同,返回不同的数据类型。比如说toBoolean
def getBoolean(key:String):Boolean={     val value: String = props.getProperty(key)  try {       return value.toBoolean  } catch {       case e:Exception => e.printStackTrace()  }  false}def getString(key:String):String={     props.getProperty(key)}def getInteger(key:String):Int={     0}
  • getProperty(key)方法获取properties的value值

4、工具类

4.1、spark配置工具类01:SparkUtils

  • 如果是本地模式,则setMaster(“local[*]”);如果是集群模式,则则不用设置Master
def getSparkConf(): SparkConf = {     var conf: SparkConf = null  if (ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)) {       conf = new SparkConf()      .setMaster("local[4]")      .setAppName(this.getClass.getName())  } else {       conf = new SparkConf()      .setAppName(this.getClass.getName())  }  conf}
  • 创建SparkSession
def getSparkSession():SparkSession={     SparkSession.builder()    .config(getSparkConf())    .getOrCreate()}

4.2、DataFrame的Schema工具类:SchemaUtils

  • 对表的首次清洗和后续清洗所用到的StructType汇总
val access_logs_schema = StructType(Array(  StructField("event_time", StringType, true),  StructField("url", StringType, true))val full_access_logs_schema = StructType(Array(    StructField("event_time", StringType, true),    StructField("userSID", StringType, true))

4.3、数据库连接的工具类:DBUtils

  • 获取数据库链接、用户、密码的类
def getDBProps():Tuple2[String,Properties]={     // 定义Mysql的url  val url: String = ConfigurationManager.getString(Constants.JDBC_URL)  // 定义Mysql的配置  val props = new Properties()  // 向配置中添加用户名和密码  props.put("user",ConfigurationManager.getString(Constants.JDBC_USER))  props.put("password",ConfigurationManager.getString(Constants.JDBC_PASSWORD))  (url,props)}

5、数据清洗类

5.1、数据清洗

  • 可设置args入口参数选项,供jar包使用
var logPath = ""if (args.length == 1) {     logPath = args(0)} else {     logPath = ConfigurationManager.getString(Constants.LOCAL_TEST_LOG__PATH)}
  • 读文件时,若文件格式不规范,使用filter算子筛选保留固定长度的数据
  • 使用dropDuplicates(col1,col2…)对重复数据进行去重
  • 使用StringUtils.isNotEmpty(str)方法过滤掉空值
  • 对Row类型的RDD通过getAs方法获取某一列的数据并做处理
  • 首次清洗分列后,可以根据需求对某列数据再次分列处理(参考以下代码的多次清洗)
val rowRDD: RDD[Row] = spark.sparkContext.textFile(logPath)      .map(_.split("\t"))      .filter(_.length == 8)      .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7)))	//首次全局清洗	val filteredLog: Dataset[Row] = logsDF.dropDuplicates("event_time", "url")      .filter(x => x(3) == "200")      .filter(x => StringUtils.isNotEmpty(x(0).toString))   	 //第二次数据清洗	val fullAccessLogRDD = filteredLog.map(line => {         val params: Array[String] = line.getAs[String]("url")        .split("\\?")      var paramsMap: Map[String, String] = Map.empty      if (params.length == 2) {           paramsMap = params(1)          .split("&")          .map(_.split("="))          .filter(_.length == 2)          .map(x => (x(0), x(1)))          .toMap      }      //第三次细节清洗

5.2、数据导入MySQL表中

  • 数据save模式有append和overwrite两种
  • 使用logger.info(s"")方法输出日志信息。需要导包。
import org.apache.log4j.Logger val logger: Logger = Logger.getLogger(this.getClass)

6、数据分析类

  • SparkSession创建
  • 获取数据库配置
  • 定义输入表、输出表
  • 从MySQL中读取DF数据:spark.read.jdbc(url,inputTableName,props)
  • 计算、生成newDF结果数据
  • 把结果数据存到MySQL:newDF.write.mode(“overwrite or append”).jdbc(url,outputTableName,props)
//1、SparkSession创建    val spark = SparkUtils.getSparkSession()    import spark.implicits._    import org.apache.spark.sql.functions._    //2、获取数据库配置    val (url,props) = DBUtils.getDBProps()    //3、读取输入表、输出表    val inputTableName = Constants.KDC_FULL_ACCESS_LOGS_TABLE_NAME    val outputTableName = Constants.KDC_ACTIVE_USER_STATICS    //4、从MySQL中读取DF数据    val full_access_logs = spark.read.jdbc(url,inputTableName,props)    //5、计算、生成newDF结果数据        //6、把结果数据存到MySQL    newDF.write.mode("overwrite or append").jdbc(url,outputTableName,props)

转载地址:http://vwqb.baihongyu.com/

你可能感兴趣的文章
Mysql 重置自增列的开始序号
查看>>
mysql 锁机制 mvcc_Mysql性能优化-事务、锁和MVCC
查看>>
MySQL 错误
查看>>
mysql 随机数 rand使用
查看>>
MySQL 面试题汇总
查看>>
MySQL 面试,必须掌握的 8 大核心点
查看>>
MySQL 高可用性之keepalived+mysql双主
查看>>
MySQL 高性能优化规范建议
查看>>
mysql 默认事务隔离级别下锁分析
查看>>
Mysql--逻辑架构
查看>>
MySql-2019-4-21-复习
查看>>
mysql-5.6.17-win32免安装版配置
查看>>
mysql-5.7.18安装
查看>>
MySQL-Buffer的应用
查看>>
mysql-cluster 安装篇(1)---简介
查看>>
mysql-connector-java.jar乱码,最新版mysql-connector-java-8.0.15.jar,如何愉快的进行JDBC操作...
查看>>
mysql-connector-java各种版本下载地址
查看>>
mysql-EXPLAIN
查看>>
MySQL-Explain的详解
查看>>
mysql-group_concat
查看>>