本文共 4491 字,大约阅读时间需要 14 分钟。
spark.local=truelocal.test.log.path=D:/JavaProjects/ClassStudy/Scala/express/files/test.log
jdbc.url=jdbc:mysql://192.168.221.140:3306/kdcjdbc.user=rootjdbc.password=kb10
val SPARK_LOCAL = "spark.local"//本地测试日志的路径val LOCAL_TEST_LOG__PATH = "local.test.log.path"val JDBC_URL = "jdbc.url"val JDBC_USER = "jdbc.user"val JDBC_PASSWORD = "jdbc.password"
private val props = new Properties()// 加载配置文件try { props.load(new FileInputStream("D:\\JavaProjects\\ClassStudy\\Scala\\express\\src\\main\\resources\\my.properties"))} catch { case e:IOException => e.printStackTrace()}
def getBoolean(key:String):Boolean={ val value: String = props.getProperty(key) try { return value.toBoolean } catch { case e:Exception => e.printStackTrace() } false}def getString(key:String):String={ props.getProperty(key)}def getInteger(key:String):Int={ 0}
def getSparkConf(): SparkConf = { var conf: SparkConf = null if (ConfigurationManager.getBoolean(Constants.SPARK_LOCAL)) { conf = new SparkConf() .setMaster("local[4]") .setAppName(this.getClass.getName()) } else { conf = new SparkConf() .setAppName(this.getClass.getName()) } conf}
def getSparkSession():SparkSession={ SparkSession.builder() .config(getSparkConf()) .getOrCreate()}
val access_logs_schema = StructType(Array( StructField("event_time", StringType, true), StructField("url", StringType, true))val full_access_logs_schema = StructType(Array( StructField("event_time", StringType, true), StructField("userSID", StringType, true))
def getDBProps():Tuple2[String,Properties]={ // 定义Mysql的url val url: String = ConfigurationManager.getString(Constants.JDBC_URL) // 定义Mysql的配置 val props = new Properties() // 向配置中添加用户名和密码 props.put("user",ConfigurationManager.getString(Constants.JDBC_USER)) props.put("password",ConfigurationManager.getString(Constants.JDBC_PASSWORD)) (url,props)}
var logPath = ""if (args.length == 1) { logPath = args(0)} else { logPath = ConfigurationManager.getString(Constants.LOCAL_TEST_LOG__PATH)}
val rowRDD: RDD[Row] = spark.sparkContext.textFile(logPath) .map(_.split("\t")) .filter(_.length == 8) .map(x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7))) //首次全局清洗 val filteredLog: Dataset[Row] = logsDF.dropDuplicates("event_time", "url") .filter(x => x(3) == "200") .filter(x => StringUtils.isNotEmpty(x(0).toString)) //第二次数据清洗 val fullAccessLogRDD = filteredLog.map(line => { val params: Array[String] = line.getAs[String]("url") .split("\\?") var paramsMap: Map[String, String] = Map.empty if (params.length == 2) { paramsMap = params(1) .split("&") .map(_.split("=")) .filter(_.length == 2) .map(x => (x(0), x(1))) .toMap } //第三次细节清洗
import org.apache.log4j.Logger val logger: Logger = Logger.getLogger(this.getClass)
//1、SparkSession创建 val spark = SparkUtils.getSparkSession() import spark.implicits._ import org.apache.spark.sql.functions._ //2、获取数据库配置 val (url,props) = DBUtils.getDBProps() //3、读取输入表、输出表 val inputTableName = Constants.KDC_FULL_ACCESS_LOGS_TABLE_NAME val outputTableName = Constants.KDC_ACTIVE_USER_STATICS //4、从MySQL中读取DF数据 val full_access_logs = spark.read.jdbc(url,inputTableName,props) //5、计算、生成newDF结果数据 //6、把结果数据存到MySQL newDF.write.mode("overwrite or append").jdbc(url,outputTableName,props)
转载地址:http://vwqb.baihongyu.com/