本文共 4349 字,大约阅读时间需要 14 分钟。
在本项目中,我们采用properties文件来存储配置信息,这样可以有效避免手动输入错误并提高配置管理的统一性。以下是常用配置项:
spark.local=true&local.test.log.path=D:/JavaProjects/ClassStudy/Scala/express/files/test.log
MySQL连接配置:
jdbc.url=jdbc:mysql://192.168.221.140:3306/kdcjdbc.user=rootjdbc.password=kb10
为了确保配置信息的可维护性,我们定义了常量类Constants,用于从properties文件中读取配置值:
val SPARK_LOCAL = "spark.local"//本地测试日志的路径val LOCAL_TEST_LOG_PATH = "local.test.log.path"val JDBC_URL = "jdbc.url"val JDBC_USER = "jdbc.user"val JDBC_PASSWORD = "jdbc.password"
我们创建了ConfigurationManager类来加载properties文件并提供读取配置值的方法:
private val props = new Properties()try { props.load(new FileInputStream("D:\\JavaProjects\\ClassStudy\\Scala\\express\\src\\main\\resources\\my.properties"))} catch { case e:IOException => e.printStackTrace()} 此外,我们还提供了根据配置值类型返回不同的数据类型的方法:
def getBoolean(key:String):Boolean={ val value: String = props.getProperty(key) try { return value.toBoolean } catch { case e:Exception => e.printStackTrace() } false}def getString(key:String):String={ props.getProperty(key)}def getInteger(key:String):Int={ 0} SparkUtils类用于处理spark配置,根据配置文件中的spark.local值决定是本地模式还是集群模式:
def getSparkConf(): SparkConf = { var conf: SparkConf = null if (ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)) { conf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName()) } else { conf = new SparkConf() .setAppName(this.getClass.getName()) } conf} 此外,还提供了创建SparkSession的方法:
def getSparkSession():SparkSession={ SparkSession.builder() .config(getSparkConf()) .getOrCreate()} SchemaUtils类用于定义数据框架的结构,用于表的清洗和处理:
val access_logs_schema = StructType(Array( StructField("event_time", StringType, true), StructField("url", StringType, true))val full_access_logs_schema = StructType(Array( StructField("event_time", StringType, true), StructField("userSID", StringType, true)) DBUtils类用于获取数据库连接信息并处理配置:
def getDBProps():Tuple2[String,Properties]={ // 定义Mysql的url val url: String = ConfigurationManager.getString(Constants.JDBC_URL) // 定义Mysql的配置 val props = new Properties() // 向配置中添加用户名和密码 props.put("user",ConfigurationManager.getString(Constants.JDBC_USER)) props.put("password",ConfigurationManager.getString(Constants.JDBC_PASSWORD)) (url,props)} 数据清洗类主要负责对日志数据进行预处理和清洗,支持通过命令行参数指定日志路径:
var logPath = ""if (args.length == 1) { logPath = args(0)} else { logPath = ConfigurationManager.getString(Constants.LOCAL_TEST_LOG_PATH)} 清洗流程包括文件读取、格式校验、去重和空值处理等:
val rowRDD: RDD[Row] = spark.sparkContext.textFile(logPath) .map(_.split("\t")) .filter(_.length == 8) .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7))) //首次全局清洗 val filteredLog: Dataset[Row] = logsDF.dropDuplicates("event_time", "url") .filter(x => x(3) == "200") .filter(x => StringUtils.isNotEmpty(x(0).toString)) //第二次数据清洗_val fullAccessLogRDD = filteredLog.map(line => { val params: Array[String] = line.getAs[String]("url") .split("\\?") var paramsMap: Map[String, String] = Map.empty if (params.length == 2) { paramsMap = params(1) .split("&") .map(_.split("=")) .filter(_.length == 2) .map(x => (x(0), x(1))) .toMap } //第三次细节清洗 数据导入MySQL表中支持两种模式:append和overwrite。同时支持日志记录:
import org.apache.log4j.Logger val logger: Logger = Logger.getLogger(this.getClass)
数据分析类主要负责从数据库读取数据进行分析并存储结果:
//1、SparkSession创建 val spark = SparkUtils.getSparkSession() import spark.implicits._ import org.apache.spark.sql.functions._ //2、获取数据库配置 val (url,props) = DBUtils.getDBProps() //3、读取输入表、输出表 val inputTableName = Constants.KDC_FULL_ACCESS_LOGS_TABLE_NAME val outputTableName = Constants.KDC_ACTIVE_USER_STATICS //4、从MySQL中读取DF数据 val full_access_logs = spark.read.jdbc(url,inputTableName,props) //5、计算、生成newDF结果数据 //6、把结果数据存到MySQL newDF.write.mode("overwrite or append").jdbc(url,outputTableName,props) 转载地址:http://vwqb.baihongyu.com/