【云计算】Hadoop--MapReduce解析与案例
小标 2019-01-23 来源 : 阅读 662 评论 0

摘要:本文主要向大家介绍了【云计算】Hadoop--MapReduce解析与案例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】Hadoop--MapReduce解析与案例,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

一、MapReduce工作机制


Job的submit()方法创建一个内部的JobSummiter实例,并且调用其submitJobInternal()方法。提交作业后,waitForCompletion()每秒轮训作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台,作业完成后,如果成功,就显示作业计数器,如果失败,就将异常抛出。

JobSummiter作业提交过程如下:


向资源管理器请求一个新应用ID,用于mapreduce作业ID。 检查作业的输出说明。如:如果没有指定输出目录或者输出目录已经存在,作业就不提交。 计算作业输入分片。如果分片无法计算,比如因为输入路径下没有文件,作业就不提交。 将运行作业所需要的资源(包括作业JAR文件,配置文件和计算所得的输入分片)复制到一个以作业ID命名的目录下的共享文件系统中。作业JAR的副本较多(默认值为10),因此在运行作业的任务时,集群中有很多个副本可供节点管理器访问。 调用资源管理器的submitApplication()方法提交作业。

作业的初始化


资源管理器收到提交作业的消息后,便将请求传递到yarn调度器,调度器分配一个容器,然后资源管理器在节点管理器的管理下在容器中启动application master的进程 application master是一个java应用程序,它的主类是MRAppMaster。它将初始化作业,接受来自任务的进度和完成报告。 接下来,它接受来自共享文件系统的,在客户端计算的输入分片。然后对每一个分片创建一个map任务对象以及由mapreduce.job.reduces属性(或job.setNumReduceTasks()设置)确定的多个reduce任务对象,任务ID在此时分配。 application master必须决定如何运行mapreduce作业的各个任务。如果作业很小,则选择和自己在同一个jvm上运行。如果作业较大,则需要标记多jvm并行运行。(默认情况下,小作业就是少于10个mapper且只有1个reducer且输入大小小于一个HDFS块) 最后,在任何任务运行之前,会建立作业的最终输出目录及任务输出的临时工作空间。

任务的分配和执行


如果作业不适合并行,则application master就会为该作业向资源管理器申请maptask容器,在当前节点运行maptask 如果作业较大,则application master就会为该作业向资源管理器申请多个maptask容器,reourcemanager将运行maptask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 资源管理器向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动maptask,maptask对数据分区排序。 接下来,application master向资源管理器申请reducetask容器 reduce task向maptask获取相应分区的数据。 程序运行完毕后,application master会向资源管理器注销自己。

任务的进度




- 当map任务或reduce任务运行时,子进程和自己的父进程application master通过umbilical接口进行通信,每隔3秒钟,任务通过umbilical接口向自己的application master报告进度和状态(包括计数器),application master会形成一个作业的汇聚视图。

- 资源管理器的界面显示了所有运行中的应用程序,当客户端请求查看进度时,资源管理器会有链接指向这些应用各自的application master的界面,这些界面展示了mapreduce作业的更多细节,包括进度。


作业的完成


当application master收到作业最后一个任务已完成的通知后,便把作业的状态设置成“成功”。然后,在Job轮询状态时,便已经知道任务已成功完成,于是job打印一条消息告知用户,然后从waitforCompletion()方法返回。Job的统计信息和计数值也在这个时候输出到控制台。


二、MapTask工作机制


Read阶段:Map Task通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value。 Map阶段:该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。 Collect阶段:在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的key/value分区(调用Partitioner),并写入一个环形内存缓冲区中。

Spill阶段:即“溢写”,当环形缓冲区满后,MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。

步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当期内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index中。


Combine阶段:当所有数据处理完成后,MapTask对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。


当所有数据处理完后,MapTask会将所有临时文件合并成一个大文件,并保存到文件output/file.out中,同时生成相应的索引文件output/file.out.index。

在进行文件合并过程中,MapTask以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

三、Reduce工作机制


Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。 Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。 Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。 Reduce阶段:reduce()函数将计算结果写到HDFS上。

四、Shuffle机制




Mapreduce确保每个reducer的输入都是按键排序的。系统执行排序的过程(即将map输出作为输入传给reducer)称为shuffle。


MapReduce工作流程




流程详解

上面的流程是整个mapreduce工作流程,但是shuffle过程只是从第7步开始到第16步结束,具体shuffle过程详解,如下:


maptask收集我们的map()方法输出的kv对,放到内存缓冲区中 从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 多个溢出文件会被合并成大的溢出文件 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序 reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据 reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序) 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

注意

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。


五、MapReduce案例


1、WordCount


在一堆给定的文本文件中统计输出每一个单词出现的总次数。

分析:


maptask是一行一行读取文件,对于每一行数据,都调用一次map()方法。 拿到一行数据,使用空格作为分割符,将一行字符串分割成单词。将单词输出作为reduce的输入。 在中间shuffle过程,mapreduce框架会自动分区排序合并,传入到reduce的结果是键-Iterator(值)形式

-在reduce中遍历Iterator中所有值,加一,输出到文件

WordCountMapper.java


package com.demo.mapreduce.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.wordcount
 * @Description:
 * @date 2018/8/13 15:10
 */
public class WordCountMapper extends Mapper {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取一行数据,
        String line = value.toString();
        // 获取每个单词
        String[] words = line.trim().split(" ");
        // 输出每个单词
        for (String word : words){
            context.write(new Text(word), new LongWritable(1));
        }

    }
}


WordCountReducer.java


package com.demo.mapreduce.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.wordcount
 * @Description:
 * @date 2018/8/13 15:38
 */
public class WordCountReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        // 统计所有单词的个数
        long count = 0;
        for (LongWritable value : values){
            count += value.get();
        }
        // 输出所有单词的个数
        context.write(key, new LongWritable(count));
    }
}


WordCountDriver.java


package com.demo.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.wordcount
 * @Description:驱动主程序
 * @date 2018/8/13 15:46
 */
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 加载jar位置
        job.setJarByClass(WordCountDriver.class);

        // 设置mapreduce的class
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置输出mapper的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置最终数据输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置输入,输出的文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result  0 : 1);
    }
}


打包成jar包,放入集群中!

输入文件:

areuok.txt


Thank you
Are you ok
Hello
Thank you
Thank you very much
Hello
Thank you
Thank you very much
He
He hello
Thank you
Thank you very much
He
Hehe
Hello
Thank you
Thank you very much
How are you indian mi fans
Do you like mi 4i
Ok indian mi fans
Do you like mi band
We will give everyone
A free mi band
And me
Mi fans
Do you like
I'm very happy to
To be aan indian
I'm very happy to
To be a gift
I'm a free gift
For every-everyone
Do you like me
Yeeeeeeeh
Thank you very much
Oh indian mi fans
Are you ok
Are you ok
Yeeeeeeeh
Oh everyone
Are you ok
Are you ok
I I I I I I I
Mean
Are you ok
Are you ok
I'm very ok
Oh indian mi fans
Are you ok
Are you ok
Oh china mi fans
Are you ok
Are you ok
I I I I I I I
Mean
How are you
How are you
I'm fine thank you
Are you ok
Are you ok
We will give everyone a band
Are you ok
We will give a band to everyone
Are you ok
We will give a colourful strap
All for free
I'm very happy
Ha happy
Oh indian mi fans
Are you ok
Are you ok
Yeeeeeeeh
Oh everyone
Are you ok
Are you ok
I I I I I I I
Mean
Are you ok
Are you ok
I'm very ok
Once again
Oh indian mi fans
Are you ok
Are you ok
Oh china mi fans
Are you ok
Are you ok
I I I I I I I
Mean
How are you
How are you
I'm fine thank you
Are you ok


运行jar:


[root@hadoop001 hadoop-2.6.5]# hadoop jar demo.jar /user/data/input/areuok.txt /user/data/output/


运行结果:

part-r-00000


4i  1
A   1
All 1
And 1
Are 26
Do  4
For 1
Ha  1
He  3
Hehe    1
Hello   3
How 5
I   28
I'm 8
Mean    4
Mi  1
Oh  8
Ok  1
Once    1
Thank   10
To  2
We  4
Yeeeeeeeh   3
a   5
aan 1
again   1
are 5
band    4
be  2
china   2
colourful   1
every-everyone  1
everyone    5
fans    9
fine    2
for 1
free    3
gift    2
give    4
happy   4
hello   1
indian  7
like    4
me  2
mi  11
much    5
ok  28
strap   1
thank   2
to  3
very    10
will    4
you 47


2、WordcountCombiner


combiner是MR程序中Mapper和Reducer之外的一种组件 combiner组件的父类就是Reducer combiner和reducer的区别在于运行的位置:

Combiner是在每一个maptask所在的节点运行

Reducer是接收全局所有Mapper的输出结果; combiner的意义就是对每一个maptask的输出进行局部汇总,以减小网络传输量

此案例是对于wordcount的升级,在每个maptask运行分区之后先归并一次,然后再输入reduce处理。

分析:


其他步骤同wordcount相同 自定义一个combiner继承Reducer,重写reduce方法 最后在driver中需要设置combiner

WordCountCombiner.java


package com.demo.mapreduce.wordcount;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.wordcount
 * @Description:
 * @date 2018/8/14 14:30
 */
public class WordCountCombiner extends Reducer {
    LongWritable longWritable = new LongWritable();
    // 传入数据例子:,输出目标:
    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
        // 计算累加
        long count = 0;
        for (LongWritable longWritable : values){
            count += longWritable.get();
        }
        longWritable.set(count);
        context.write(key, longWritable);
    }
}


WordCountDriver.java


package com.demo.mapreduce.wordcount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.wordcount
 * @Description:驱动主程序
 * @date 2018/8/13 15:46
 */
public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 获取job对象信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 加载jar位置
        job.setJarByClass(WordCountDriver.class);

        // 设置mapreduce的class
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        // 设置输出mapper的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置最终数据输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置输入,输出的文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 关联combiner
        job.setCombinerClass(WordCountCombiner.class);

        boolean result = job.waitForCompletion(true);

        System.exit(result  0 : 1);
    }
}


输入文件同上

运行结果:


4i  1
A   1
All 1
And 1
Are 26
Do  4
For 1
Ha  1
He  3
Hehe    1
Hello   3
How 5
I   28
I'm 8
Mean    4
Mi  1
Oh  8
Ok  1
Once    1
Thank   10
To  2
We  4
Yeeeeeeeh   3
a   5
aan 1
again   1
are 5
band    4
be  2
china   2
colourful   1
every-everyone  1
everyone    5
fans    9
fine    2
for 1
free    3
gift    2
give    4
happy   4
hello   1
indian  7
like    4
me  2
mi  11
much    5
ok  28
strap   1
thank   2
to  3
very    10
will    4
you 47


3、流量汇总


需求:

统计手机号耗费的总上行流量、下行流量、总流量(序列化)


分析:


mapper阶段:读取一行数据,切分字段 抽取手机号、上行流量、下行流量 以手机号为key,bean对象为value输出,即context.write(手机号,bean); reduce阶段:累加上行流量和下行流量得到总流量。 实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

FlowBean.java


package com.demo.mapreduce.flow;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:01
 */
public class FlowBean implements Writable{

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    // 反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

}


FlowMapper.java


package com.demo.mapreduce.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:09
 */
public class FlowMapper extends Mapper {
    private FlowBean flowBean = new FlowBean();
    private Text k = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取一行数据
        String line = value.toString();

        // 截取数据
        String[] fields = line.split("\t");

        // 封装bean对象以及获取电话号码key
        String phoneNumber = fields[1];
        long upFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        flowBean.set(upFlow, downFlow);
        k.set(phoneNumber);
        // 写出去
        context.write(k, flowBean);
    }
}


FlowReducer.java


package com.demo.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:23
 */
public class FlowReducer extends Reducer {

    @Override
    protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

        long upFlow = 0;
        long downFlow = 0;
        // 将同一号码的流量使用累加
        for(FlowBean flowBean : values){
            upFlow += flowBean.getUpFlow();
            downFlow += flowBean.getDownFlow();
        }

        // 输出
        context.write(key, new FlowBean(upFlow, downFlow));
    }
}


FlowDriver.java


package com.demo.mapreduce.flow;

import com.demo.mapreduce.wordcount.WordCountMapper;
import com.demo.mapreduce.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:29
 */
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job对象信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 加载jar位置
        job.setJarByClass(FlowDriver.class);

        // 设置mapreduce的class
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置输出mapper的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置最终数据输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置输入,输出的文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result  0 : 1);
    }
}


输入数据:


phone_data.txt


1360973757112   13609737571 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2451    24887   200

1360973757112   13609737571 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2452    24812   200

1360973757112   13709737572 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2453    24827   200

1360973757112   13709737572 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24548   24834   200

1360973757112   13509737573 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2455    2483    200

1360973757112   13509737574 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2456    24856   200

1360973757112   13809737575 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2457    2485    200

1360973757112   13809737576 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2458    24887   200

1360973757112   13909737574 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2459    24877   200

1360973757112   13909737575 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  2450    24897   200

1360973757112   13209737578 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24518   24807   200

1360973757112   13209737579 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24528   24817   200

1360973757112   13609737578 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24568   24882   200

1360973757112   13509737579 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24538   24883   200

1360973757112   13509737570 00-fc-sd-4a-we-07:CMCC  192.168.0.154   315613245861    24  24548   24884   200


运行命令:


[root@hadoop001 hadoop-2.6.5]# hadoop jar demo.jar /user/data/input/phone_data.txt /user/data/output/


运行结果:


这里写代码片


4、手机流量汇总(map输出分区)


在maptask运行完后将手机号码按不同开头分不同区输出不同的文件


分析:


Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。默认的分发规则为:根据key的hashcode%reducetask数来分发 如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner

自定义一个CustomPartitioner继承抽象类:Partitioner 在job驱动中,设置自定义partitioner:job.setPartitionerClass(CustomPartitioner.class) 在第三3步的基础上,增加分区类。

FlowPartitioner.java


package com.demo.mapreduce.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:分区,
 * @date 2018/8/14 9:55
 */
public class FlowPartitioner extends Partitioner {
    @Override
    public int getPartition(Text text, FlowBean flowBean, int i) {
        // 拿到电话号码前三位来分区
        String phoneNum = text.toString().substring(0, 3);

        int partitions = 4;
        if ("135".equals(phoneNum)){
            partitions = 0;
        }else if ("136".equals(phoneNum)){
            partitions = 1;
        }else if ("137".equals(phoneNum)){
            partitions = 2;
        }else if ("138".equals(phoneNum)){
            partitions = 3;
        }
        return partitions;
    }
}


FlowDriver.java


package com.demo.mapreduce.flow;

import com.demo.mapreduce.wordcount.WordCountMapper;
import com.demo.mapreduce.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:29
 */
public class FlowDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job对象信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 加载jar位置
        job.setJarByClass(FlowDriver.class);

        // 设置mapreduce的class
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);

        // 设置输出mapper的数据类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);

        // 设置最终数据输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置分区
        job.setPartitionerClass(FlowPartitioner.class);
        // 设置reduce个数
        job.setNumReduceTasks(5);

        // 设置输入,输出的文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result  0 : 1);
    }
}


输入数据同上

输出结果:

可以看到,输出了5个文件,说明启动了5个reducetask

part-r-00000


13509737570 24548   24884   49432
13509737573 2455    2483    4938
13509737574 2456    24856   27312
13509737579 24538   24883   49421


part-r-00001


13609737571 4903    49699   54602
13609737578 24568   24882   49450


part-r-00002


13709737572 27001   49661   76662


part-r-00003


13809737575 2457    2485    4942
13809737576 2458    24887   27345


part-r-00004


13209737578 24518   24807   49325
13209737579 24528   24817   49345
13909737574 2459    24877   27336
13909737575 2450    24897   27347


5、手机流量汇总(按总流量多少排序)


在第3的基础上,将输出的总流量进行排序。

分析:


把程序分两步走,第一步正常统计总流量,第二步再把结果进行排序 context.write(总流量,手机号) FlowBean实现WritableComparable接口重写compareTo方法

FlowBean.java


package com.demo.mapreduce.flowsort;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:01
 */
public class FlowBean implements Writable, WritableComparable {

    private long upFlow;
    private long downFlow;
    private long sumFlow;

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
    }

    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
    }

    public long getSumFlow() {
        return sumFlow;
    }

    public void setSumFlow(long sumFlow) {
        this.sumFlow = sumFlow;
    }

    public void set(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.sumFlow = upFlow + downFlow;
    }
    // 序列化
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeLong(upFlow);
        dataOutput.writeLong(downFlow);
        dataOutput.writeLong(sumFlow);
    }

    // 反序列化
    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.upFlow = dataInput.readLong();
        this.downFlow = dataInput.readLong();
        this.sumFlow = dataInput.readLong();
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + sumFlow;
    }

    @Override
    public int compareTo(FlowBean o) {
        return this.sumFlow > o.getSumFlow()  -1 : 1;
    }
}


FlowSortMapper.java


package com.demo.mapreduce.flowsort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flow
 * @Description:
 * @date 2018/8/13 17:09
 */
public class FlowSortMapper extends Mapper {
    private FlowBean flowBean = new FlowBean();
    private Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        // 获取一行数据
        String line = value.toString();

        // 截取数据
        String[] fields = line.split("\t");

        // 封装bean对象以及获取电话号码key
        long upFlow = Long.parseLong(fields[1]);
        long downFlow = Long.parseLong(fields[2]);
        flowBean.set(upFlow, downFlow);
        String phoneNumber = fields[0];
        v.set(phoneNumber);
        // 写出去
        context.write(flowBean, v);
    }
}


FlowSortReducer.java


package com.demo.mapreduce.flowsort;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flowsort
 * @Description:
 * @date 2018/8/14 10:25
 */
public class FlowSortReducer extends Reducer {

    @Override
    protected void reduce(FlowBean key, Iterable values, Context context) throws IOException, InterruptedException {
        Text k = values.iterator().next();
        context.write(k, key);
    }
}


FlowSortDriver.java


package com.demo.mapreduce.flowsort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @author xyd
 * @version V1.0
 * @Package com.demo.mapreduce.flowsort
 * @Description:
 * @date 2018/8/14 10:30
 */
public class FlowSortDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 获取job对象信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 加载jar位置
        job.setJarByClass(FlowSortDriver.class);

        // 设置mapreduce的class
        job.setMapperClass(FlowSortMapper.class);
        job.setReducerClass(FlowSortReducer.class);

        // 设置输出mapper的数据类型
        job.setMapOutputKeyClass(FlowBean.class);
        job.setMapOutputValueClass(Text.class);

        // 设置最终数据输出的类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);

        // 设置输入,输出的文件路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        boolean result = job.waitForCompletion(true);

        System.exit(result  0 : 1);
    }
}


输入文件:


13609737571 2451    24887
13609737571 2452    24812
13709737572 2453    24827
13709737572 24548   24834
13509737573 2455    2483
13509737574 2456    24856
13809737575 2457    2485
13809737576 2458    24887
13909737574 2459    24877
13909737575 2450    24897
13209737578 24518   24807
13209737579 24528   24817
13609737578 24568   24882
13509737579 24538   24883
13509737570 24548   24884


运行结果:


13609737578 24568   24882   49450
13509737570 24548   24884   49432
13509737579 24538   24883   49421
13709737572 24548   24834   49382
13209737579 24528   24817   49345
13209737578 24518   24807   49325
13909737575 2450    24897   27347
13809737576 2458    24887   27345
13609737571 2451    24887   27338
13909737574 2459    24877   27336
13509737574 2456    24856   27    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程