互联网日志分析


开启单节点集群环境

1.修改云主机host文件,添加内网IP,对应映射名为hadoop000,实现云主机自身使用root用户ssh访问hadoop000免密登陆
操作环境:
Hadoop/Hive/Spark

echo "172.18.0.248 hadoop000" >> /etc/hosts

ssh-keygen -t rsa

ssh-copy-id -i /root/.ssh/id_rsa.pub root@hadoop000

2.格式化HDFS文件系统
操作环境:
Hadoop/Hive/Spark

hdfs namenode -format

3.启动Hadoop集群
操作环境:
Hadoop/Hive/Spark

start-all.sh

互联网日志分析

1.将本地数据/root/internetlogs/journal.log上传至HDFS文件系统/input/下,注意自行创建目录。
操作环境:Hadoop/Hive/Spark

hdfs dfs -mkdir /input
hdfs dfs -put /root/internetlogs/journal.log /input/

2.编写程序进行页面访问量统计,结果保存至本地/root/internetlogs/pv/目录下part-00000文件中
操作环境:Hadoop/Hive/Spark

java
public class PageViewCount extends Configured implements Tool {
    public static class PVMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(" ");
            String page = fields[6];
            context.write(new Text(page), new IntWritable(1));
        }
    }

    public static class PVReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(PageViewCount.class);
        job.setMapperClass(PVMapper.class);
        job.setReducerClass(PVReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/input"));
        FileOutputFormat.setOutputPath(job, new Path("/output/pv"));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PageViewCount(), args);
        System.exit(exitCode);
    }
}

运行MapReduce程序,从HDFS下载结果文件到本地:

hadoop jar PageViewCount.jar /input /output/pv

hdfs dfs -get /output/pv /root/internetlogs/pv

3.编写程序进行页面独立IP的访问量统计,结果保存至本地/root/internetlogs/ip/目录下part-00000文件中,例如1.80.249.223 1表示此IP访问量为1
操作环境:Hadoop/Hive/Spark

public class UniqueIPCount extends Configured implements Tool {
    public static class IPMapper extends Mapper<LongWritable, Text, Text, Text> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(" ");
            String page = fields[6];
            String ip = fields[0];
            context.write(new Text(page), new Text(ip));
        }
    }

    public static class IPReducer extends Reducer<Text, Text, Text, Text> {
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            Set<String> ips = new HashSet<>();
            for (Text value : values) {
                ips.add(value.toString());
            }
            context.write(key, new Text(String.valueOf(ips.size())));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(UniqueIPCount.class);
        job.setMapperClass(IPMapper.class);
        job.setReducerClass(IPReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileInputFormat.addInputPath(job, new Path("/input"));
        FileOutputFormat.setOutputPath(job, new Path("/output/ip"));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new UniqueIPCount(), args);
        System.exit(exitCode);
    }
}

运行MapReduce程序,从HDFS下载结果文件到本地:

hadoop jar UniqueIPCount.jar /input /output/ip

hdfs dfs -get /output/ip /root/internetlogs/ip

4.编写程序进行每小时访问网站的次数统计,结果保存至本地/root/internetlogs/time/目录下part-00000文件中
操作环境:Hadoop/Hive/Spark

public class HourlyVisitCount extends Configured implements Tool {
    public static class TimeMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(" ");
            String timestamp = fields[3];
            String hour = timestamp.substring(11, 13);
            String date = timestamp.substring(1, 11);
            String hourlyKey = date + hour;
            context.write(new Text(hourlyKey), new IntWritable(1));
        }
    }

    public static class TimeReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(HourlyVisitCount.class);
        job.setMapperClass(TimeMapper.class);
        job.setReducerClass(TimeReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/input"));
        FileOutputFormat.setOutputPath(job, new Path("/output/time"));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new HourlyVisitCount(), args);
        System.exit(exitCode);
    }
}

运行MapReduce程序,从HDFS下载结果文件到本地:

hadoop jar HourlyVisitCount.jar /input /output/time

hdfs dfs -get /output/time /root/internetlogs/time

5.编写程序进行访问网站的浏览器标识统计,结果保存至本地/root/internetlogs/browser/目录下part-00000文件中,具体查看步骤说明
操作环境:Hadoop/Hive/Spark

public class BrowserCount extends Configured implements Tool {
    public static class BrowserMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] fields = value.toString().split(" ");
            String browser = fields[7];
            context.write(new Text(browser), new IntWritable(1));
        }
    }

    public static class BrowserReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf());
        job.setJarByClass(BrowserCount.class);
        job.setMapperClass(BrowserMapper.class);
        job.setReducerClass(BrowserReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path("/input"));
        FileOutputFormat.setOutputPath(job, new Path("/output/browser"));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new BrowserCount(), args);
        System.exit(exitCode);
    }
}

运行MapReduce程序,从HDFS下载结果文件到本地:

hadoop jar BrowserCount.jar /input /output/browser

hdfs dfs -get /output/browser /root/internetlogs/browser

LazzMan 2024年9月27日 23:48 收藏文档