大数据工程师
常见问题
数据人必须了解的大数据中台技术架构
数据采集&传输
Flume
Logstash
Flume和Logstash对比
数据存储
Hadoop
Hbase
数据计算&查询
Hive
Spark
大数据实践
集群搭建
医疗数据分析
高校专业学科分析
互联网日志分析
本文档使用 MrDoc 发布
-
+
首页
高校专业学科分析
## 开启单节点集群环境 1. 修改云主机host文件,添加内网IP,对应映射名为hadoop000,实现云主机自身使用root用户ssh访问hadoop000免密登陆 操作环境:Hadoop/Hive/Spark ``` echo "172.18.39.202 hadoop000" >> /etc/hosts ssh-keygen ssh-copy-id -i /root/.ssh/id_rsa.pub localhost ``` 2. 修改resourcemanager对外UI端口为23456(yarn-site.xml,具体参考步骤说明),格式化HDFS文件系统 操作环境:Hadoop/Hive/Spark ``` vim /root/software/hadoop-2.7.7/etc/hadoop/yarn-site.xml <property> <name>yarn.resourcemanager.webapp.address</name> <value>hadoop000:23456</value> </property> # core-site.xml添加配置,设置客户端访问datanode使用hostname来进行访问 vim /root/software/hadoop-2.7.7/etc/hadoop/core-site.xml <property> <name>dfs.client.use.datanode.hostname</name> <value>true</value> </property> hdfs namenode -format ``` 3. 启动Hadoop集群 操作环境:Hadoop/Hive/Spark ``` cd /root/software/hadoop-2.7.7/sbin start-all.sh ``` ## 全国高校数据分析 > 修改本地hosts文件,添加`hadoop000`映射云主机外网IP,本地 eclipse 开发工具需要使用域名进行访问。 > window本地调试MapReduce程序需要包含hadoop-2.x.x.tar.gz、hadoop.dll、winutils.exe三个文件; > 将hadoop-2.6.5.tar.gz解压到自己想要存放的Windows目录下,例如`E:\WorkSpace\Software\hadoop-2.6.5`; > 在系统变量里配置HADOOP_HOME、HADOOP_USER_NAME,其中HADOOP_USER_NAME是指在访问HDFS时使用的用户,而Windows环境下进行调试时默认使用的是Windows的用户,但是Windows用户没有读写HDFS的权限,所以我们需要配置成有权限的Linux用户,这里使用的是root用户; > 将winutils.exe放到%HADOOP_HOME%\bin路径下; > 将hadoop.dll放到C:\Windows\System32路径下; 准备1:上传数据文件到DFS中 ``` hdfs dfs -mkdir /major hdfs dfs -put /root/major/* /major ``` 准备2:新建maven项目,pom如下: ``` <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xtkg</groupId> <artifactId>schoolanalysis</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.7.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.7.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.7.5</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.4.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.3.2</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project> ``` 1. 对全国高校院校类别进行分析,结果保存至本地/root/major/type/part-r-00000(数据格式:办学类别 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 1.统计全国高校各办学类别的总数情况 */ public class EducationType { public static class EducationTypeMapper extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); try { if (key.get() == 0 ){ //System.out.println(line); return; }else{ // 数据切分 String[] splits = line.split(","); // 分类收集数据,因地址中有逗号 switch (splits.length){ case 14: context.write(new Text(splits[7]),new IntWritable(1)); break; case 13: context.write(new Text(splits[6]),new IntWritable(1)); break; case 12: context.write(new Text(splits[5]),new IntWritable(1)); break; } } }catch (Exception e){ e.printStackTrace(); } } } public static class EducationTypeReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; if (key.getLength()!=0){ for (IntWritable value : values) { // 求和统计 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(EducationType.class); // 设置Mapper和Reducer实现类 job.setMapperClass(EducationTypeMapper.class); job.setReducerClass(EducationTypeReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/type/"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 2. 对各省份高校数量进行分析并按照数量进行降序,结果保存至本地/root/major/sortprovince/part-r-00000(数据格式:省份 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 2.1 需求:统计分析各个省份高校数量 */ public class Province_School { /** * map端获取school.csv数据中的省份数据,对省份的高校数据记为1 */ public static class ProvinceMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取行数据 String line = value.toString(); if (key.get() == 0){ return; }else { // 切分数据,获取省份数据 String[] lines = line.split(","); String province = lines[1]; // 收集数据传送到下游 context.write(new Text(province), new IntWritable(1)); } } } /** * reduce端进行分组聚合,统计各个省份的高校总数量 */ public static class ProvinceReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义变量sum,求取总值 int sum = 0; for (IntWritable value : values) { // 求取总和 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); System.out.println("统计结果:"+key+":"+sum); } } /** * 主程序入口,创建并配置job */ public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 指定打包程序 job.setJarByClass(Province_School.class); // 设置自定义Mapper和Reducer实现 job.setMapperClass(ProvinceMapper.class); job.setReducerClass(ProvinceReducer.class); job.setNumReduceTasks(1); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/province"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 2.2 需求:对省份统计结果数据进行排序操作 */ public class SortProvince { /** * map端对key,value数据位置进行调换,利用hadoop自带排序对key进行排序 */ public static class SortProvinceMapper extends Mapper<LongWritable, Text, IntWritable,Text>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取行数据 String line = value.toString(); // 切分数据获取省份和高校数量数据 String[] splits = line.split("\t"); if (splits.length == 2){ // 上下文传递数据 context.write(new IntWritable(Integer.valueOf(splits[1])),new Text(splits[0])); } } } /** * reduce端对排序key,value值互换输出 */ public static class SortProvinceReducer extends Reducer<IntWritable,Text,Text,IntWritable>{ @Override protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException { // 遍历value数据 for (Text value : values) { context.write(new Text(value),key); System.out.println("数据结果:"+value+":"+key); } } } /** * 自定义排序实现 */ public static class MyComparator extends IntWritable.Comparator{ @Override public int compare(Object a, Object b) { return -super.compare(a, b); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return -super.compare(b1, s1, l1, b2, s2, l2); } } /** * 主程序入口,创建并配置job */ public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 指定打包程序 job.setJarByClass(SortProvince.class); // 设置自定义Mapper和Reducer实现 job.setMapperClass(SortProvinceMapper.class); job.setReducerClass(SortProvinceReducer.class); job.setNumReduceTasks(1); // 设置自定义排序实现类 job.setSortComparatorClass(MyComparator.class); // 设置MapTask和ReduceTask的输出key-value类型 job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/province/part-r-00000"); Path outputPath = new Path("/major/sortprovince"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 3. 对各省份985高校数量进行分析,结果保存至本地/root/major/985/part-r-00000(数据格式:省份 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 3. 需求:统计985高校分布的地区数据 */ public class School_NEF { public static class School_NEFMapper extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (key.get() == 0 ){ return; }else { // 数据切分 String[] splits = line.split(","); // 是否为985高校 if (splits[splits.length - 5].equals("是")) { context.write(new Text(splits[1]), new IntWritable(1)); } } } } public static class School_NEFReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; for (IntWritable value : values) { // 求和统计 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(School_NEF.class); // 设置Mapper和Reducer实现类 job.setMapperClass(School_NEFMapper.class); job.setReducerClass(School_NEFReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/985"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 4. 对各省份211高校数量进行分析,结果保存至本地/root/major/211/part-r-00000(数据格式:省份 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 4. 需求:统计211高校分布的地区数据 */ public class School_TOO { public static class School_TOOMapper extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); if (key.get() == 0 ){ return; } else { // 数据切分 String[] splits = line.split(","); if (splits[splits.length - 4].equals("是")) { context.write(new Text(splits[1]), new IntWritable(1)); } } } } public static class School_TOOReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; for (IntWritable value : values) { // 求和统计 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(School_TOO.class); // 设置Mapper和Reducer实现类 job.setMapperClass(School_TOOMapper.class); job.setReducerClass(School_TOOReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/211"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 5. 对各省份双一流高校数量进行分析,结果保存至本地/root/major/doublefc/part-r-00000(数据格式:省份 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 5.需求:统计双一流高校所在省份总数量 */ public class School_FirstRate { public static class School_FirstRateMapper extends Mapper<LongWritable, Text,Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); // 数据切分 String[] splits = line.split(","); if (splits[splits.length-3].equals("是")){ context.write(new Text(splits[1]),new IntWritable(1)); } } } public static class School_FirstRateReducer extends Reducer<Text,IntWritable,Text,IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; for (IntWritable value : values) { // 求和统计 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(School_FirstRate.class); // 设置Mapper和Reducer实现类 job.setMapperClass(School_FirstRateMapper.class); job.setReducerClass(School_FirstRateReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/doublefc"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 6. 对我国高校水平层次进行统计分析,结果保存至本地/root/major/level/part-r-00000(数据格式:水平层次 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 6. 需求:统计各水平层次的分类数量 */ public class HorizontalLevel_1 { public static class HorizontalLevelMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); try { if (key.get() == 0 ){ return; }else{ // 数据切分 String[] splits = line.split(","); // 分类收集数据,因地址中有逗号 switch (splits.length){ case 14: context.write(new Text(splits[6]),new IntWritable(1)); break; case 13: context.write(new Text(splits[5]),new IntWritable(1)); break; case 12: context.write(new Text(splits[4]),new IntWritable(1)); break; } // context.write(new Text(splits[splits.length-8]),new IntWritable(1)); } }catch (Exception e){ e.printStackTrace(); } } } public static class HorizontalLevelReducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; for (IntWritable value : values) { // 求和统计 sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(HorizontalLevel_1.class); // 设置Mapper和Reducer实现类 job.setMapperClass(HorizontalLevelMapper.class); job.setReducerClass(HorizontalLevelReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/school.csv"); Path outputPath = new Path("/major/level"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 7. 对我国高校特色专业进行统计分析,结果保存至本地/root/major/specmajor/part-r-00000(数据格式:学校 特色专业总量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * //总需求:求高校国家特色专业占比 * 7. 需求一:计算所有高校特色专业数量 * //需求二:求取特色所占比例 */ public class Specialty_1 { public static class Specialty_1Mapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取行数据 String line = value.toString(); // 切分数据 String[] splits = line.split(","); // 只收集特色专业数据 if (splits.length == 4 && splits[3].equals("是")){ context.write(new Text(splits[0]),new IntWritable(1)); } } } public static class Specialty_1Reducer extends Reducer<Text,IntWritable,Text,IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // 定义求和变量 int sum = 0; // 获取遍历数据 for (IntWritable value : values) { // 累加求和 sum += value.get(); } // 数据传输 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(Specialty_1.class); // 设置Mapper和Reducer实现类 job.setMapperClass(Specialty_1Mapper.class); job.setReducerClass(Specialty_1Reducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/professional.csv"); Path outputPath = new Path("/major/specmajor"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 8. 对高校所有含有“大数据”的专业进行频率统计,结果保存至本地/root/major/bigdatamajor/part-r-00000(数据格式:专业名称 数量) 操作环境:Hadoop/Hive/Spark ``` import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 8. 需求:高校所有含有“大数据”的专业进行频率统计 */ public class BigData { public static class MyMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 获取行数据 String line = value.toString(); if (key.get() == 0 ){ return; }else { // 切分数据 String[] splits = line.split(","); if (key.get() == 0) { return; } else { // 获取专业名称字段数据 String professionalName = splits[2]; // 匹配大数据字段 if (professionalName.contains("大数据")) { context.write(new Text(professionalName), new IntWritable(1)); } } } } } public static class MyReducer extends Reducer<Text, IntWritable,Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } // 收集数据 context.write(key,new IntWritable(sum)); } } public static void main(String[] args) throws IOException { // 创建conf对象 Configuration conf = new Configuration(); // 设置数据所在文件系统hdfs conf.set("dfs.client.use.datanode.hostname", "true"); conf.set("fs.defaultFS","hdfs://hadoop000:9000");// HDFS集群中NameNode的URI,获取DistributedFileSystem实例 // 获取job实例 Job job = Job.getInstance(conf); // 设置打包类 job.setJarByClass(EducationType.class); // 设置Mapper和Reducer实现类 job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); // 设置MapTask(可省略)和ReduceTask的输出key-value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); // 设置数据输入输出路径 Path inputPath = new Path("/major/professional.csv"); Path outputPath = new Path("/major/bigdatamajor/"); // 判断路径是否存在,存在删除 FileSystem fileSystem = FileSystem.get(conf); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); } FileInputFormat.setInputPaths(job,inputPath); FileOutputFormat.setOutputPath(job,outputPath); // 提交job try { System.out.println(job.waitForCompletion(true)?0:1); } catch (InterruptedException e) { e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } } ``` 9. 所有代码执行完毕后,将计算结果导出到云主机本地,再执行验证 ``` hdfs dfs -get /major /root ```
LazzMan
2023年10月26日 23:37
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
PDF文档(打印)
分享
链接
类型
密码
更新密码