查看原文
其他

大数据入门:Spark+Kudu的广告业务项目实战笔记(六)

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构点击右侧关注,大数据开发领域最强公众号!

暴走大数据点击右侧关注,暴走大数据!

Spark+Kudu的广告业务项目实战系列:
Spark+Kudu的广告业务项目实战笔记(一)


本章目标:将代码打包并运行在服务器上。

1.将数据放在HDFS上

先把Hadoop启动起来:
[hadoop@hadoop000 ~]$ cd app/[hadoop@hadoop000 app]$ lsapache-maven-3.6.3 hive-1.1.0-cdh5.15.1 spark-2.4.5-bin-hadoop2.6hadoop-2.6.0-cdh5.15.1 jdk1.8.0_91 tmp[hadoop@hadoop000 app]$ cd hadoop-2.6.0-cdh5.15.1/[hadoop@hadoop000 hadoop-2.6.0-cdh5.15.1]$ lsbin etc include LICENSE.txt README.txt srcbin-mapreduce1 examples lib logs sbincloudera examples-mapreduce1 libexec NOTICE.txt share[hadoop@hadoop000 hadoop-2.6.0-cdh5.15.1]$ cd sbin/[hadoop@hadoop000 sbin]$ lsdistribute-exclude.sh slaves.sh stop-all.shhadoop-daemon.sh start-all.cmd stop-balancer.shhadoop-daemons.sh start-all.sh stop-dfs.cmdhdfs-config.cmd start-balancer.sh stop-dfs.shhdfs-config.sh start-dfs.cmd stop-secure-dns.shhttpfs.sh start-dfs.sh stop-yarn.cmdkms.sh start-secure-dns.sh stop-yarn.shLinux start-yarn.cmd yarn-daemon.shmr-jobhistory-daemon.sh start-yarn.sh yarn-daemons.shrefresh-namenodes.sh stop-all.cmd[hadoop@hadoop000 sbin]$ ./start-all.sh This script is Deprecated. Instead use start-dfs.sh and start-yarn.shStarting namenodes on [hadoop000]hadoop000: starting namenode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-namenode-hadoop000.outhadoop000: starting datanode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-datanode-hadoop000.outStarting secondary namenodes [0.0.0.0]0.0.0.0: starting secondarynamenode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-secondarynamenode-hadoop000.outstarting yarn daemonsstarting resourcemanager, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/yarn-hadoop-resourcemanager-hadoop000.outhadoop000: starting nodemanager, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/yarn-hadoop-nodemanager-hadoop000.out
在HDFS上新建个目录,数据在HDFS上的规划是每天一个目录,YYYYMMDD格式:
[hadoop@hadoop000 sbin]$ hadoop fs -mkdir -p /tai/access/20181007
把data-test.json放到HDFS上:
[hadoop@hadoop000 sbin]$ hadoop fs -put ~/data/data-test.json /tai/access/20181007/
去50070端口瞅瞅:
成功
把ip.txt像之前一样上传到HDFS上:
hadoop fs -put ~/data/ip.txt /tai/access/

2.定时操作重构

每天跑一次,每天凌晨3点跑,需要传递一个需要处理的时间,在SparkApp.scala中:
package com.imooc.bigdata.cp08
import com.imooc.bigdata.cp08.business.{AppStatProcessor, AreaStatProcessor, LogETLProcessor, ProvinceCityStatProcessor}import org.apache.commons.lang3.StringUtilsimport org.apache.spark.internal.Loggingimport org.apache.spark.sql.SparkSession

//整个项目的入口object SparkApp extends Logging{
def main(args: Array[String]): Unit = {val spark = SparkSession.builder() .master("local[2]") .appName("SparkApp") .getOrCreate()//spark-submit ... --conf time=20181007//spark框架只认spark开头的代码val time = spark.sparkContext.getConf.get("spark.time")if(StringUtils.isBlank(time)){//若为空则不执行 logError("处理批次不能为空") System.exit(0) }

LogETLProcessor.process(spark) ProvinceCityStatProcessor.process(spark) AreaStatProcessor.process(spark) AppStatProcessor.process(spark) spark.stop() }
}
由于要将名字改为表名+时间命名,需要增加一个DateUtils:
package com.imooc.bigdata.cp08.utils
import org.apache.spark.sql.SparkSession
object DateUtils {
def getTableName(tableName:String,spark:SparkSession)={val time = spark.sparkContext.getConf.get("spark.time") tableName + "_" + time  }}
之后将所有的表名改为工具类的调用,举例来说:
val sourceTableName = DateUtils.getTableName("ods",spark)val sinkTableName = DateUtils.getTableName("province_city_stat",spark)
之后需要制定两个源文件的路径:
val rawPath = spark.sparkContext.getConf.get("spark.raw.path")    var jsonDF = spark.read.json(rawPath)val ipRulePath = spark.sparkContext.getConf.get("spark.ip.path")val ipRowRDD = spark.sparkContext.textFile(ipRulePath)
入参有:spark.time/spark.raw.path/spark.ip.path
另外在生产时spark应这样调用,不要硬编码,统一使用spark-submit提交时指定:
val spark = SparkSession.builder().getOrCreate()

3.代码打包

用Maven打包代码:
将包传到服务器的~/lib目录下。
找到本地的kudu-client-1.7.0.jar和kudu-spark2_2.11-1.7.0.jar,也上传到服务器的~/lib下。
打开spark:
cd ~/app/spark-2.4.5-bin-hadoop2.6/sbin/sh start-all.sh
开始写spark提交脚本,先在本地测试:
vi job.sh
time=20181007${SPARK_HOME}/bin/spark-submit \--class com.imooc.bigdata.cp08.SparkApp \--master local \--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \--conf spark.time=$time \--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \/home/hadoop/lib/sparksql-train-1.0.jar
 启动脚本:
sh job.sh
查看结果:
数据已经成功上传。

4.Spark on Yarn

vi jobyarn.sh
只要把--master 改成yarn即可: 
time=20181007${SPARK_HOME}/bin/spark-submit \--class com.imooc.bigdata.cp08.SparkApp \--master yarn \--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \--conf spark.time=$time \--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \/home/hadoop/lib/sparksql-train-1.0.jar
sh jobyarn.sh
去8088看一下yarn:
已经在跑啦~

5.定时调度

调度框架:crontab/Azkaban/Ooize...
本项目使用crontab,crontab表达式的工具在这里。设定一小时运行一次的表达式:
0 */1 * * *
crontab -e 编辑crontab -l 查看crontab -r 删除
若需要凌晨三点开始执行作业,执行的是昨天的数据,其中昨天的表示命令为:
[hadoop@hadoop000 lib]$ date --date='1 day ago' +%Y%m%d20200225
修改脚本:
time=`date --date='1 day ago' +%Y%m%d`${SPARK_HOME}/bin/spark-submit \--class com.imooc.bigdata.cp08.SparkApp \--master local \--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \--conf spark.time=$time \--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \/home/hadoop/lib/sparksql-train-1.0.jar
设定crontab:
crontab -e0 3 * * * /home/hadoop/lib/job.sh
搞定啦~

版权声明:

本文为《大数据技术与架构》整理,原创或原作者独家授权。未经原作者允许违规转载追究侵权责任。

本文编辑:冷眼丶

微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连



文章不错?点个【在看】吧! 👇

: . Video Mini Program Like ,轻点两下取消赞 Wow ,轻点两下取消在看

您可能也对以下帖子感兴趣

文章有问题?点此查看未经处理的缓存