大数据工程师
常见问题
数据人必须了解的大数据中台技术架构
数据采集&传输
Flume
Logstash
Flume和Logstash对比
数据存储
Hadoop
Hbase
数据计算&查询
Hive
Spark
大数据实践
集群搭建
医疗数据分析
高校专业学科分析
互联网日志分析
本文档使用 MrDoc 发布
-
+
首页
医疗数据分析
## 开启单节点集群环境 1. 修改云主机host文件,添加内网IP,对应映射名为hadoop000,实现云主机自身使用root用户ssh访问hadoop000免密登陆 操作环境:Hadoop/Hive ``` # 修改hostname hostnamectl set-hostname hadoop000 bash # 添加hostname映射 echo "172.18.2.86 hadoop000" >> /etc/hosts # 本机免密登录 ssh-keygen # 将本机的公钥复制到授权列表文件authorized_keys中 ssh-copy-id -i /root/.ssh/id_rsa.pub localhost # 验证 ssh hadoop000 exit ``` 2. 格式化HDFS文件系统 操作环境:Hadoop/Hive 3. 启动Hadoop集群 操作环境:Hadoop/Hive ``` hdfs namenode -format cd /root/software/hadoop-2.7.7/sbin start-all.sh ``` 4. 开启mysql服务 操作环境:Hadoop/Hive 5. 初始化Hive元数据库,进入Hive客户端,创建hive数据库 操作环境:Hadoop/Hive ``` # 修改hive配置,防止执行任务时堆溢出【可选】 echo "export HADOOP_HEAPSIZE=2048" >> /root/software/apache-hive-2.3.4-bin/conf/hive-env.sh # 启动数据库 systemctl start mysqld.service # 初始化数据库 schematool -dbType mysql -initSchema # 启动metastore服务 nohup hive --service metastore & # 查看进程 jobs -l # 进入Hive客户端 hive create database hive; show databases; exit; ``` ## 疫情数据分析 1. 进入hive客户端,创建名为covid_ods的数据库用于存储原始数据 操作环境:Hadoop/Hive ``` # 在主bash执行 hive create database covid_ods; use covid_ods; ``` 2. 数据库covid_ods下创建covid表,导入去除表头后的原始数据/root/covid/covid_area.csv(文件名不变) 操作环境:Hadoop/Hive ``` # 新开bash会话中执行 去除表头命令 sed -i "1d" /root/covid/covid_area.csv ``` ``` # 在主bash执行 # hive covid_ods数据库下执行 create table covid_ods.covid( continerName string comment '大洲', countryName string comment '国家', provinceName string comment '省份', province_confirm string comment '省份确诊人数', province_suspect string comment '省份感染人数', province_cured string comment '省份治愈人数', province_dead string comment '省份死亡人数', cityName string comment '城市/地区', city_confirm string comment '城市确诊人数', city_suspect string comment '城市感染人数', city_cured string comment '城市治愈人数', city_dead string comment '城市死亡人数', updateTime string comment '数据更新时间' ) comment '全球疫情数据源数据表' row format delimited fields terminated by ','; load data local inpath '/root/covid/covid_area.csv' into table covid_ods.covid; ``` 3. 数据库covid_ods下创建covid_time表,用于提取有用数据,过滤重复值,只保留每天最后更新的数据,具体参考步骤说明 操作环境:Hadoop/Hive ``` # 在主bash执行 create table covid_ods.covid_time( provinceName string comment '省份', cityName string comment '城市/地区', city_confirm string comment '城市确诊人数', city_suspect string comment '城市感染人数', city_cured string comment '城市治愈人数', city_dead string comment '城市死亡人数', updateTime string comment '数据更新时间' ) comment '中国新冠疫情数据源数据表' row format delimited fields terminated by ','; ``` 4. 按照要求向covid_ods.covid_time插入过滤后的数据 操作环境:Hadoop/Hive 要求:保留干净数据,去重(去空值、脏数据处理),提取特征数据,只保留每天最后更新的数据; 特征数据:包括省份,城市/地区,城市确诊,城市感染,城市治愈,城市死亡,数据更新时间; 过滤重复值,数据中有同一天获取的多次疫情信息,根据时间只保留每天最后更新的数据; 同时要求国家为中国,省份不为中国,过滤地区空值。 ``` # 在主bash执行 <聚合函数/排序函数> over (partition by <用于分区的列名> order by <用于排序的列名>) -- 验证排序 select *, row_number() over (partition by cityName,substr(updateTime,1,10) order by updateTime desc) as mum from covid_ods.covid limit 10; -- 国家为中国,省份不为中国,过滤地区空值 select *, row_number() over (partition by cityName,substr(updateTime,1,10) order by updateTime desc) as mum from covid_ods.covid where countryName='中国' and provinceName!='中国' and cityName!='' limit 10; --提取有用数据,过滤重复值,只保留每天最后更新的数据,插入covid_ods.covid_time表 insert into table covid_ods.covid_time select provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime from (select *, row_number() over (partition by cityName,substr(updateTime,1,10) order by updateTime desc) as num from covid_ods.covid where countryName='中国' and provinceName!='中国' and cityName!='')t where t.num=1; select count(*) from covid_ods.covid_time; select * from covid_ods.covid_time where cityName='莆田' order by updateTime desc; ``` 5. 创建名为covid_dwd的数据库,此层将数据进行分区,便于数据的快速获取 操作环境:Hadoop/Hive ``` # 在主bash执行 create database covid_dwd; use covid_dwd; ``` 6. 数据库covid_dwd下创建province表,按照年、月分区,要求根据当天时间获取昨天对应时间列,并插入对应数据,具体要求查看步骤说明 操作环境:Hadoop/Hive ``` # 在主bash执行 -- 创建表 create table covid_dwd.province( provinceName string comment '省份', cityName string comment '城市/地区', city_confirm string comment '城市确诊人数', city_suspect string comment '城市感染人数', city_cured string comment '城市治愈人数', city_dead string comment '城市死亡人数', updateTime string comment '数据更新时间', yesterday string comment '昨天更新时间' ) comment '中国新冠疫情数据源数据表' partitioned by(yearinfo string,monthinfo string) row format delimited fields terminated by ',' stored as orc tblproperties('orc.compress'='SNAPPY'); --动态分区配置 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=100000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=100000; --hive压缩 set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --本地模式 set hive.exec.mode.local.auto=true; set mapreduce.map.memory.mb=1025; set mapreduce.reduce.memory.mb=1025; set hive.exec.mode.local.auto.input.files.max=25; -- 插入数据 insert into table covid_dwd.province partition(yearinfo,monthinfo) select provinceName, cityName, city_confirm, city_suspect, city_cured, city_dead, substr(updateTime,1,10) as updateTime, date_sub(updateTime,1) as yesterday, substr(updateTime,1,4) as yearinfo, substr(updateTime,6,2) as monthinfo from covid_ods.covid_time; ``` 7. 创建名为covid_dwm的数据库,用于统计每个省份的各指标增长量。 操作环境:Hadoop/Hive ``` # 在主bash执行 create database covid_dwm; use covid_dwm; ``` 8. 数据库covid_dwm下创建two_day表,将province中当天数据和前一天的数据进行汇总,通过join方式将数据合并为一条数据,具体查看步骤说明 操作环境:Hadoop/Hive ``` # 在主bash执行 -- 创建表 create table covid_dwm.two_day( provinceName string comment '省份', cityName string comment '城市/地区', city_confirm int comment '城市确诊人数', city_suspect int comment '城市感染人数', city_cured int comment '城市治愈人数', city_dead int comment '城市死亡人数', updateTime string comment '更新时间', city_confirm_before int comment '一天前城市确诊人数', city_suspect_before int comment '一天前城市感染人数', city_cured_before int comment '一天前城市治愈人数', city_dead_before int comment '一天前城市死亡人数', yesterday string comment '昨天更新时间' ) comment '中国当日数据和前一天数据汇总数据表' partitioned by(yearinfo string,monthinfo string) row format delimited fields terminated by ',' stored as orc tblproperties('orc.compress'='SNAPPY'); -- 查询数据 select * from covid_dwd.province limit 10; 黑龙江省 七台河 1 0 0 0 2020-01-27 2020-01-26 2020 01 黑龙江省 七台河 2 0 0 0 2020-01-28 2020-01-27 2020 01 黑龙江省 七台河 1 0 0 0 2020-01-27 -- 将以上两条数据进行合并 黑龙江省 七台河 2 0 0 0 2020-01-28 1 0 0 0 2020-01-27 select * from (select provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,yesterday,yearinfo,monthinfo from covid_dwd.province)a join (select provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime from covid_dwd.province)b on a.provinceName=b.provinceName and a.cityName=b.cityName and a.yesterday=b.updateTime limit 10; 黑龙江省 七台河 2 0 0 0 2020-01-28 2020-01-27 2020 01 黑龙江省 七台河 1 0 0 0 2020-01-27 -- 提取特定字段,将结果插入covid_dwm.two_day表 insert into table covid_dwm.two_day partition(yearinfo,monthinfo) select a.provinceName as provinceName, a.cityName as cityName, cast(a.city_confirm as int) as city_confirm, cast(a.city_suspect as int) as city_suspect, cast(a.city_cured as int) as city_cured, cast(a.city_dead as int) as city_dead, a.updateTime as updateTime, cast(b.city_confirm as int) as city_confirm_before, cast(b.city_suspect as int) as city_suspect_before, cast(b.city_cured as int) as city_cured_before, cast(b.city_dead as int) as city_dead_before, a.yesterday as yesterday, substr(a.updateTime,1,4) as yearinfo, substr(a.updateTime,6,2) as monthinfo from (select provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime,yesterday,yearinfo,monthinfo from covid_dwd.province)a join (select provinceName,cityName,city_confirm,city_suspect,city_cured,city_dead,updateTime from covid_dwd.province)b on a.provinceName=b.provinceName and a.cityName=b.cityName and a.yesterday=b.updateTime; -- 验证数据是否插入成功 select * from covid_dwm.two_day limit 3; 黑龙江省 七台河 2 0 0 0 2020-01-28 1 0 0 0 2020-01-27 2020 01 黑龙江省 七台河 2 0 0 0 2020-01-29 2 0 0 0 2020-01-28 2020 01 黑龙江省 七台河 2 0 0 0 2020-01-30 2 0 0 0 2020-01-29 2020 01 ``` 9. 将表two_day中所有内容保存至云主机/root/covid/two_day.csv 操作环境:Hadoop/Hive ``` # 在另一个bash会话中执行 hive -e "set hive.cli.print.header=true;select * from covid_dwm.two_day" | sed 's/\t/,/g' > /root/covid/two_day.csv ``` 10. 数据库covid_dws下创建day表,用于计算地区每日指标增量,具体字段查看步骤说明 操作环境:Hadoop/Hive ``` # 在主bash执行 create database covid_dws; use covid_dws; -- 创建表 create table covid_dws.day( provinceName string comment '省份', cityName string comment '城市/地区', new_city_confirm int comment '日确诊增长人数', new_city_suspect int comment '日疑似增长人数', new_city_cured int comment '日治愈增长人数', new_city_dead int comment '日死亡增长人数', updateTime string comment '更新时间' ) comment '各省疫情每日增长表' partitioned by(yearinfo string,monthinfo string) row format delimited fields terminated by ',' stored as orc tblproperties('orc.compress'='SNAPPY'); 在dwd层已经拿到前一天的数据,在本层计算各个地区的指标增量,计算方式为: 每日指标增量=今日指标数据-前一天指标数据 # 黑龙江省 七台河 2 0 0 0 2020-01-28 1 0 0 0 2020-01-27 2020 01 --动态分区配置 set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=100000; set hive.exec.max.dynamic.partitions=100000; set hive.exec.max.created.files=100000; --hive压缩 set hive.exec.compress.intermediate=true; set hive.exec.compress.output=true; --写入时压缩生效 set hive.exec.orc.compression.strategy=COMPRESSION; --本地模式 set hive.exec.mode.local.auto=true; set mapreduce.map.memory.mb=1025; set mapreduce.reduce.memory.mb=1025; set hive.exec.mode.local.auto.input.files.max=25; insert into table covid_dws.day partition(yearinfo,monthinfo) select provinceName, cityName, (city_confirm-city_confirm_before) as new_city_confirm, (city_suspect-city_suspect_before) as new_city_suspect, (city_cured-city_cured_before) as new_city_cured, (city_dead-city_dead_before) as new_city_dead, updateTime, substr(updateTime,1,4) as yearinfo, -- yearinfo substr(updateTime,6,2) as monthinfo -- monthinfo from covid_dwm.two_day; ``` 11. 将表day中所有内容保存至云主机/root/covid/day.csv 操作环境:Hadoop/Hive ``` # 在另一个bash会话中执行 hive -e "set hive.cli.print.header=true;select * from covid_dws.day" | sed 's/\t/,/g' > /root/covid/day.csv ``` 12. 创建名为covid_app的数据库,此层用于各个省份每日的指标增量情况统计 操作环境:Hadoop/Hive ``` # 在主bash执行 create database covid_app; use covid_app; ``` 13. 数据库covid_app下创建app层业务表,进行各个省份每日的指标增量情况统计 操作环境:Hadoop/Hive ``` # 在主bash执行 -- 创建表 create table covid_app.day_app( provinceName string comment '省份', new_city_confirm int comment '日确诊增长人数', new_city_suspect int comment '日疑似增长人数', new_city_cured int comment '日治愈增长人数', new_city_dead int comment '日死亡增长人数', updateTime string comment '更新时间' ) comment '全国各省疫情每日新增人数统计表' partitioned by(yearinfo string,monthinfo string) row format delimited fields terminated by ',' stored as orc tblproperties('orc.compress'='SNAPPY'); -- 插入数据 insert into table covid_app.day_app partition(yearinfo,monthinfo) select provinceName, sum(new_city_confirm) as new_city_confirm, sum(new_city_suspect) as new_city_suspect, sum(new_city_cured) as new_city_cured, sum(new_city_dead) as new_city_dead, updateTime, yearinfo, monthinfo from covid_dws.day group by yearinfo,monthinfo,provinceName,updateTime; -- 验证数据 select * from covid_app.day_app limit 10; ``` 14. 将表day_app中所有内容保存至云主机/root/covid/day_app.csv 操作环境:Hadoop/Hive ``` # 在另一个bash会话中执行 hive -e "set hive.cli.print.header=true;select * from covid_app.day_app" | sed 's/\t/,/g' > /root/covid/day_app.csv ```
LazzMan
2023年10月25日 21:26
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码